1use acp_thread::{AgentSessionInfo, AgentSessionList, AgentSessionListRequest, SessionListUpdate};
2use agent_client_protocol as acp;
3use gpui::Task;
4use std::rc::Rc;
5use ui::prelude::*;
6
7pub struct ThreadHistory {
8 session_list: Rc<dyn AgentSessionList>,
9 sessions: Vec<AgentSessionInfo>,
10 _refresh_task: Task<()>,
11 _watch_task: Option<Task<()>>,
12}
13
14impl ThreadHistory {
15 pub fn new(session_list: Rc<dyn AgentSessionList>, cx: &mut Context<Self>) -> Self {
16 let mut this = Self {
17 session_list,
18 sessions: Vec::new(),
19 _refresh_task: Task::ready(()),
20 _watch_task: None,
21 };
22
23 this.start_watching(cx);
24 this
25 }
26
27 #[cfg(any(test, feature = "test-support"))]
28 pub fn set_session_list(
29 &mut self,
30 session_list: Rc<dyn AgentSessionList>,
31 cx: &mut Context<Self>,
32 ) {
33 if Rc::ptr_eq(&self.session_list, &session_list) {
34 return;
35 }
36
37 self.session_list = session_list;
38 self.sessions.clear();
39 self._refresh_task = Task::ready(());
40 self.start_watching(cx);
41 }
42
43 fn start_watching(&mut self, cx: &mut Context<Self>) {
44 let Some(rx) = self.session_list.watch(cx) else {
45 self._watch_task = None;
46 self.refresh_sessions(false, cx);
47 return;
48 };
49 self.session_list.notify_refresh();
50
51 self._watch_task = Some(cx.spawn(async move |this, cx| {
52 while let Ok(first_update) = rx.recv().await {
53 let mut updates = vec![first_update];
54 while let Ok(update) = rx.try_recv() {
55 updates.push(update);
56 }
57
58 this.update(cx, |this, cx| {
59 let needs_refresh = updates
60 .iter()
61 .any(|u| matches!(u, SessionListUpdate::Refresh));
62
63 if needs_refresh {
64 this.refresh_sessions(false, cx);
65 } else {
66 for update in updates {
67 if let SessionListUpdate::SessionInfo { session_id, update } = update {
68 this.apply_info_update(session_id, update, cx);
69 }
70 }
71 }
72 })
73 .ok();
74 }
75 }));
76 }
77
78 fn apply_info_update(
79 &mut self,
80 session_id: acp::SessionId,
81 info_update: acp::SessionInfoUpdate,
82 cx: &mut Context<Self>,
83 ) {
84 let Some(session) = self
85 .sessions
86 .iter_mut()
87 .find(|s| s.session_id == session_id)
88 else {
89 return;
90 };
91
92 match info_update.title {
93 acp::MaybeUndefined::Value(title) => {
94 session.title = Some(title.into());
95 }
96 acp::MaybeUndefined::Null => {
97 session.title = None;
98 }
99 acp::MaybeUndefined::Undefined => {}
100 }
101 match info_update.updated_at {
102 acp::MaybeUndefined::Value(date_str) => {
103 if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(&date_str) {
104 session.updated_at = Some(dt.with_timezone(&chrono::Utc));
105 }
106 }
107 acp::MaybeUndefined::Null => {
108 session.updated_at = None;
109 }
110 acp::MaybeUndefined::Undefined => {}
111 }
112 if let Some(meta) = info_update.meta {
113 session.meta = Some(meta);
114 }
115
116 cx.notify();
117 }
118
119 fn refresh_sessions(&mut self, load_all_pages: bool, cx: &mut Context<Self>) {
120 let session_list = self.session_list.clone();
121
122 self._refresh_task = cx.spawn(async move |this, cx| {
123 let mut cursor: Option<String> = None;
124 let mut is_first_page = true;
125
126 loop {
127 let request = AgentSessionListRequest {
128 cursor: cursor.clone(),
129 ..Default::default()
130 };
131 let task = cx.update(|cx| session_list.list_sessions(request, cx));
132 let response = match task.await {
133 Ok(response) => response,
134 Err(error) => {
135 log::error!("Failed to load session history: {error:#}");
136 return;
137 }
138 };
139
140 let acp_thread::AgentSessionListResponse {
141 sessions: page_sessions,
142 next_cursor,
143 ..
144 } = response;
145
146 this.update(cx, |this, cx| {
147 if is_first_page {
148 this.sessions = page_sessions;
149 } else {
150 this.sessions.extend(page_sessions);
151 }
152 cx.notify();
153 })
154 .ok();
155
156 is_first_page = false;
157 if !load_all_pages {
158 break;
159 }
160
161 match next_cursor {
162 Some(next_cursor) => {
163 if cursor.as_ref() == Some(&next_cursor) {
164 log::warn!(
165 "Session list pagination returned the same cursor; stopping to avoid a loop."
166 );
167 break;
168 }
169 cursor = Some(next_cursor);
170 }
171 None => break,
172 }
173 }
174 });
175 }
176
177 pub fn refresh(&mut self, _cx: &mut Context<Self>) {
178 self.session_list.notify_refresh();
179 }
180
181 pub fn session_for_id(&self, session_id: &acp::SessionId) -> Option<AgentSessionInfo> {
182 self.sessions
183 .iter()
184 .find(|entry| &entry.session_id == session_id)
185 .cloned()
186 }
187
188 pub(crate) fn sessions(&self) -> &[AgentSessionInfo] {
189 &self.sessions
190 }
191}
192
193#[cfg(test)]
194mod tests {
195 use super::*;
196 use acp_thread::AgentSessionListResponse;
197 use gpui::TestAppContext;
198 use std::{
199 any::Any,
200 sync::{Arc, Mutex},
201 };
202
203 fn init_test(cx: &mut TestAppContext) {
204 cx.update(|cx| {
205 let settings_store = settings::SettingsStore::test(cx);
206 cx.set_global(settings_store);
207 theme_settings::init(theme::LoadThemes::JustBase, cx);
208 });
209 }
210
211 #[derive(Clone)]
212 struct TestSessionList {
213 sessions: Vec<AgentSessionInfo>,
214 updates_tx: smol::channel::Sender<SessionListUpdate>,
215 updates_rx: smol::channel::Receiver<SessionListUpdate>,
216 }
217
218 impl TestSessionList {
219 fn new(sessions: Vec<AgentSessionInfo>) -> Self {
220 let (tx, rx) = smol::channel::unbounded();
221 Self {
222 sessions,
223 updates_tx: tx,
224 updates_rx: rx,
225 }
226 }
227
228 fn send_update(&self, update: SessionListUpdate) {
229 self.updates_tx.try_send(update).ok();
230 }
231 }
232
233 impl AgentSessionList for TestSessionList {
234 fn list_sessions(
235 &self,
236 _request: AgentSessionListRequest,
237 _cx: &mut App,
238 ) -> Task<anyhow::Result<AgentSessionListResponse>> {
239 Task::ready(Ok(AgentSessionListResponse::new(self.sessions.clone())))
240 }
241
242 fn watch(&self, _cx: &mut App) -> Option<smol::channel::Receiver<SessionListUpdate>> {
243 Some(self.updates_rx.clone())
244 }
245
246 fn notify_refresh(&self) {
247 self.send_update(SessionListUpdate::Refresh);
248 }
249
250 fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
251 self
252 }
253 }
254
255 #[derive(Clone)]
256 struct PaginatedTestSessionList {
257 first_page_sessions: Vec<AgentSessionInfo>,
258 second_page_sessions: Vec<AgentSessionInfo>,
259 requested_cursors: Arc<Mutex<Vec<Option<String>>>>,
260 async_responses: bool,
261 updates_tx: smol::channel::Sender<SessionListUpdate>,
262 updates_rx: smol::channel::Receiver<SessionListUpdate>,
263 }
264
265 impl PaginatedTestSessionList {
266 fn new(
267 first_page_sessions: Vec<AgentSessionInfo>,
268 second_page_sessions: Vec<AgentSessionInfo>,
269 ) -> Self {
270 let (tx, rx) = smol::channel::unbounded();
271 Self {
272 first_page_sessions,
273 second_page_sessions,
274 requested_cursors: Arc::new(Mutex::new(Vec::new())),
275 async_responses: false,
276 updates_tx: tx,
277 updates_rx: rx,
278 }
279 }
280
281 fn with_async_responses(mut self) -> Self {
282 self.async_responses = true;
283 self
284 }
285
286 fn requested_cursors(&self) -> Vec<Option<String>> {
287 self.requested_cursors.lock().unwrap().clone()
288 }
289
290 fn clear_requested_cursors(&self) {
291 self.requested_cursors.lock().unwrap().clear()
292 }
293
294 fn send_update(&self, update: SessionListUpdate) {
295 self.updates_tx.try_send(update).ok();
296 }
297 }
298
299 impl AgentSessionList for PaginatedTestSessionList {
300 fn list_sessions(
301 &self,
302 request: AgentSessionListRequest,
303 cx: &mut App,
304 ) -> Task<anyhow::Result<AgentSessionListResponse>> {
305 let requested_cursors = self.requested_cursors.clone();
306 let first_page_sessions = self.first_page_sessions.clone();
307 let second_page_sessions = self.second_page_sessions.clone();
308
309 let respond = move || {
310 requested_cursors
311 .lock()
312 .unwrap()
313 .push(request.cursor.clone());
314
315 match request.cursor.as_deref() {
316 None => AgentSessionListResponse {
317 sessions: first_page_sessions,
318 next_cursor: Some("page-2".to_string()),
319 meta: None,
320 },
321 Some("page-2") => AgentSessionListResponse::new(second_page_sessions),
322 _ => AgentSessionListResponse::new(Vec::new()),
323 }
324 };
325
326 if self.async_responses {
327 cx.foreground_executor().spawn(async move {
328 smol::future::yield_now().await;
329 Ok(respond())
330 })
331 } else {
332 Task::ready(Ok(respond()))
333 }
334 }
335
336 fn watch(&self, _cx: &mut App) -> Option<smol::channel::Receiver<SessionListUpdate>> {
337 Some(self.updates_rx.clone())
338 }
339
340 fn notify_refresh(&self) {
341 self.send_update(SessionListUpdate::Refresh);
342 }
343
344 fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
345 self
346 }
347 }
348
349 fn test_session(session_id: &str, title: &str) -> AgentSessionInfo {
350 AgentSessionInfo {
351 session_id: acp::SessionId::new(session_id),
352 work_dirs: None,
353 title: Some(title.to_string().into()),
354 updated_at: None,
355 created_at: None,
356 meta: None,
357 }
358 }
359
360 #[gpui::test]
361 async fn test_refresh_only_loads_first_page_by_default(cx: &mut TestAppContext) {
362 init_test(cx);
363
364 let session_list = Rc::new(PaginatedTestSessionList::new(
365 vec![test_session("session-1", "First")],
366 vec![test_session("session-2", "Second")],
367 ));
368
369 let history = cx.new(|cx| ThreadHistory::new(session_list.clone(), cx));
370 cx.run_until_parked();
371
372 history.update(cx, |history, _cx| {
373 assert_eq!(history.sessions.len(), 1);
374 assert_eq!(
375 history.sessions[0].session_id,
376 acp::SessionId::new("session-1")
377 );
378 });
379 assert_eq!(session_list.requested_cursors(), vec![None]);
380 }
381
382 #[gpui::test]
383 async fn test_enabling_full_pagination_loads_all_pages(cx: &mut TestAppContext) {
384 init_test(cx);
385
386 let session_list = Rc::new(PaginatedTestSessionList::new(
387 vec![test_session("session-1", "First")],
388 vec![test_session("session-2", "Second")],
389 ));
390
391 let history = cx.new(|cx| ThreadHistory::new(session_list.clone(), cx));
392 cx.run_until_parked();
393 session_list.clear_requested_cursors();
394
395 history.update(cx, |history, cx| history.refresh_sessions(true, cx));
396 cx.run_until_parked();
397
398 history.update(cx, |history, _cx| {
399 assert_eq!(history.sessions.len(), 2);
400 assert_eq!(
401 history.sessions[0].session_id,
402 acp::SessionId::new("session-1")
403 );
404 assert_eq!(
405 history.sessions[1].session_id,
406 acp::SessionId::new("session-2")
407 );
408 });
409 assert_eq!(
410 session_list.requested_cursors(),
411 vec![None, Some("page-2".to_string())]
412 );
413 }
414
415 #[gpui::test]
416 async fn test_standard_refresh_replaces_with_first_page_after_full_history_refresh(
417 cx: &mut TestAppContext,
418 ) {
419 init_test(cx);
420
421 let session_list = Rc::new(PaginatedTestSessionList::new(
422 vec![test_session("session-1", "First")],
423 vec![test_session("session-2", "Second")],
424 ));
425
426 let history = cx.new(|cx| ThreadHistory::new(session_list.clone(), cx));
427 cx.run_until_parked();
428
429 history.update(cx, |history, cx| history.refresh_sessions(true, cx));
430 cx.run_until_parked();
431 session_list.clear_requested_cursors();
432
433 history.update(cx, |history, cx| {
434 history.refresh(cx);
435 });
436 cx.run_until_parked();
437
438 history.update(cx, |history, _cx| {
439 assert_eq!(history.sessions.len(), 1);
440 assert_eq!(
441 history.sessions[0].session_id,
442 acp::SessionId::new("session-1")
443 );
444 });
445 assert_eq!(session_list.requested_cursors(), vec![None]);
446 }
447
448 #[gpui::test]
449 async fn test_re_entering_full_pagination_reloads_all_pages(cx: &mut TestAppContext) {
450 init_test(cx);
451
452 let session_list = Rc::new(PaginatedTestSessionList::new(
453 vec![test_session("session-1", "First")],
454 vec![test_session("session-2", "Second")],
455 ));
456
457 let history = cx.new(|cx| ThreadHistory::new(session_list.clone(), cx));
458 cx.run_until_parked();
459
460 history.update(cx, |history, cx| history.refresh_sessions(true, cx));
461 cx.run_until_parked();
462 session_list.clear_requested_cursors();
463
464 history.update(cx, |history, cx| history.refresh_sessions(true, cx));
465 cx.run_until_parked();
466
467 history.update(cx, |history, _cx| {
468 assert_eq!(history.sessions.len(), 2);
469 });
470 assert_eq!(
471 session_list.requested_cursors(),
472 vec![None, Some("page-2".to_string())]
473 );
474 }
475
476 #[gpui::test]
477 async fn test_partial_refresh_batch_drops_non_first_page_sessions(cx: &mut TestAppContext) {
478 init_test(cx);
479
480 let second_page_session_id = acp::SessionId::new("session-2");
481 let session_list = Rc::new(PaginatedTestSessionList::new(
482 vec![test_session("session-1", "First")],
483 vec![test_session("session-2", "Second")],
484 ));
485
486 let history = cx.new(|cx| ThreadHistory::new(session_list.clone(), cx));
487 cx.run_until_parked();
488
489 history.update(cx, |history, cx| history.refresh_sessions(true, cx));
490 cx.run_until_parked();
491
492 session_list.clear_requested_cursors();
493
494 session_list.send_update(SessionListUpdate::SessionInfo {
495 session_id: second_page_session_id.clone(),
496 update: acp::SessionInfoUpdate::new().title("Updated Second"),
497 });
498 session_list.send_update(SessionListUpdate::Refresh);
499 cx.run_until_parked();
500
501 history.update(cx, |history, _cx| {
502 assert_eq!(history.sessions.len(), 1);
503 assert_eq!(
504 history.sessions[0].session_id,
505 acp::SessionId::new("session-1")
506 );
507 assert!(
508 history
509 .sessions
510 .iter()
511 .all(|session| session.session_id != second_page_session_id)
512 );
513 });
514 assert_eq!(session_list.requested_cursors(), vec![None]);
515 }
516
517 #[gpui::test]
518 async fn test_full_pagination_works_with_async_page_fetches(cx: &mut TestAppContext) {
519 init_test(cx);
520
521 let session_list = Rc::new(
522 PaginatedTestSessionList::new(
523 vec![test_session("session-1", "First")],
524 vec![test_session("session-2", "Second")],
525 )
526 .with_async_responses(),
527 );
528
529 let history = cx.new(|cx| ThreadHistory::new(session_list.clone(), cx));
530 cx.run_until_parked();
531 session_list.clear_requested_cursors();
532
533 history.update(cx, |history, cx| history.refresh_sessions(true, cx));
534 cx.run_until_parked();
535
536 history.update(cx, |history, _cx| {
537 assert_eq!(history.sessions.len(), 2);
538 });
539 assert_eq!(
540 session_list.requested_cursors(),
541 vec![None, Some("page-2".to_string())]
542 );
543 }
544
545 #[gpui::test]
546 async fn test_apply_info_update_title(cx: &mut TestAppContext) {
547 init_test(cx);
548
549 let session_id = acp::SessionId::new("test-session");
550 let sessions = vec![AgentSessionInfo {
551 session_id: session_id.clone(),
552 work_dirs: None,
553 title: Some("Original Title".into()),
554 updated_at: None,
555 created_at: None,
556 meta: None,
557 }];
558 let session_list = Rc::new(TestSessionList::new(sessions));
559
560 let history = cx.new(|cx| ThreadHistory::new(session_list.clone(), cx));
561 cx.run_until_parked();
562
563 session_list.send_update(SessionListUpdate::SessionInfo {
564 session_id: session_id.clone(),
565 update: acp::SessionInfoUpdate::new().title("New Title"),
566 });
567 cx.run_until_parked();
568
569 history.update(cx, |history, _cx| {
570 let session = history.sessions.iter().find(|s| s.session_id == session_id);
571 assert_eq!(
572 session.unwrap().title.as_ref().map(|s| s.as_ref()),
573 Some("New Title")
574 );
575 });
576 }
577
578 #[gpui::test]
579 async fn test_apply_info_update_clears_title_with_null(cx: &mut TestAppContext) {
580 init_test(cx);
581
582 let session_id = acp::SessionId::new("test-session");
583 let sessions = vec![AgentSessionInfo {
584 session_id: session_id.clone(),
585 work_dirs: None,
586 title: Some("Original Title".into()),
587 updated_at: None,
588 created_at: None,
589 meta: None,
590 }];
591 let session_list = Rc::new(TestSessionList::new(sessions));
592
593 let history = cx.new(|cx| ThreadHistory::new(session_list.clone(), cx));
594 cx.run_until_parked();
595
596 session_list.send_update(SessionListUpdate::SessionInfo {
597 session_id: session_id.clone(),
598 update: acp::SessionInfoUpdate::new().title(None::<String>),
599 });
600 cx.run_until_parked();
601
602 history.update(cx, |history, _cx| {
603 let session = history.sessions.iter().find(|s| s.session_id == session_id);
604 assert_eq!(session.unwrap().title, None);
605 });
606 }
607
608 #[gpui::test]
609 async fn test_apply_info_update_ignores_undefined_fields(cx: &mut TestAppContext) {
610 init_test(cx);
611
612 let session_id = acp::SessionId::new("test-session");
613 let sessions = vec![AgentSessionInfo {
614 session_id: session_id.clone(),
615 work_dirs: None,
616 title: Some("Original Title".into()),
617 updated_at: None,
618 created_at: None,
619 meta: None,
620 }];
621 let session_list = Rc::new(TestSessionList::new(sessions));
622
623 let history = cx.new(|cx| ThreadHistory::new(session_list.clone(), cx));
624 cx.run_until_parked();
625
626 session_list.send_update(SessionListUpdate::SessionInfo {
627 session_id: session_id.clone(),
628 update: acp::SessionInfoUpdate::new(),
629 });
630 cx.run_until_parked();
631
632 history.update(cx, |history, _cx| {
633 let session = history.sessions.iter().find(|s| s.session_id == session_id);
634 assert_eq!(
635 session.unwrap().title.as_ref().map(|s| s.as_ref()),
636 Some("Original Title")
637 );
638 });
639 }
640
641 #[gpui::test]
642 async fn test_multiple_info_updates_applied_in_order(cx: &mut TestAppContext) {
643 init_test(cx);
644
645 let session_id = acp::SessionId::new("test-session");
646 let sessions = vec![AgentSessionInfo {
647 session_id: session_id.clone(),
648 work_dirs: None,
649 title: None,
650 updated_at: None,
651 created_at: None,
652 meta: None,
653 }];
654 let session_list = Rc::new(TestSessionList::new(sessions));
655
656 let history = cx.new(|cx| ThreadHistory::new(session_list.clone(), cx));
657 cx.run_until_parked();
658
659 session_list.send_update(SessionListUpdate::SessionInfo {
660 session_id: session_id.clone(),
661 update: acp::SessionInfoUpdate::new().title("First Title"),
662 });
663 session_list.send_update(SessionListUpdate::SessionInfo {
664 session_id: session_id.clone(),
665 update: acp::SessionInfoUpdate::new().title("Second Title"),
666 });
667 cx.run_until_parked();
668
669 history.update(cx, |history, _cx| {
670 let session = history.sessions.iter().find(|s| s.session_id == session_id);
671 assert_eq!(
672 session.unwrap().title.as_ref().map(|s| s.as_ref()),
673 Some("Second Title")
674 );
675 });
676 }
677
678 #[gpui::test]
679 async fn test_refresh_supersedes_info_updates(cx: &mut TestAppContext) {
680 init_test(cx);
681
682 let session_id = acp::SessionId::new("test-session");
683 let sessions = vec![AgentSessionInfo {
684 session_id: session_id.clone(),
685 work_dirs: None,
686 title: Some("Server Title".into()),
687 updated_at: None,
688 created_at: None,
689 meta: None,
690 }];
691 let session_list = Rc::new(TestSessionList::new(sessions));
692
693 let history = cx.new(|cx| ThreadHistory::new(session_list.clone(), cx));
694 cx.run_until_parked();
695
696 session_list.send_update(SessionListUpdate::SessionInfo {
697 session_id: session_id.clone(),
698 update: acp::SessionInfoUpdate::new().title("Local Update"),
699 });
700 session_list.send_update(SessionListUpdate::Refresh);
701 cx.run_until_parked();
702
703 history.update(cx, |history, _cx| {
704 let session = history.sessions.iter().find(|s| s.session_id == session_id);
705 assert_eq!(
706 session.unwrap().title.as_ref().map(|s| s.as_ref()),
707 Some("Server Title")
708 );
709 });
710 }
711
712 #[gpui::test]
713 async fn test_info_update_for_unknown_session_is_ignored(cx: &mut TestAppContext) {
714 init_test(cx);
715
716 let session_id = acp::SessionId::new("known-session");
717 let sessions = vec![AgentSessionInfo {
718 session_id,
719 work_dirs: None,
720 title: Some("Original".into()),
721 updated_at: None,
722 created_at: None,
723 meta: None,
724 }];
725 let session_list = Rc::new(TestSessionList::new(sessions));
726
727 let history = cx.new(|cx| ThreadHistory::new(session_list.clone(), cx));
728 cx.run_until_parked();
729
730 session_list.send_update(SessionListUpdate::SessionInfo {
731 session_id: acp::SessionId::new("unknown-session"),
732 update: acp::SessionInfoUpdate::new().title("Should Be Ignored"),
733 });
734 cx.run_until_parked();
735
736 history.update(cx, |history, _cx| {
737 assert_eq!(history.sessions.len(), 1);
738 assert_eq!(
739 history.sessions[0].title.as_ref().map(|s| s.as_ref()),
740 Some("Original")
741 );
742 });
743 }
744}