thread_history.rs

  1use acp_thread::{AgentSessionInfo, AgentSessionList, AgentSessionListRequest, SessionListUpdate};
  2use agent_client_protocol as acp;
  3use gpui::Task;
  4use std::rc::Rc;
  5use ui::prelude::*;
  6
  7pub struct ThreadHistory {
  8    session_list: Rc<dyn AgentSessionList>,
  9    sessions: Vec<AgentSessionInfo>,
 10    _refresh_task: Task<()>,
 11    _watch_task: Option<Task<()>>,
 12}
 13
 14impl ThreadHistory {
 15    pub fn new(session_list: Rc<dyn AgentSessionList>, cx: &mut Context<Self>) -> Self {
 16        let mut this = Self {
 17            session_list,
 18            sessions: Vec::new(),
 19            _refresh_task: Task::ready(()),
 20            _watch_task: None,
 21        };
 22
 23        this.start_watching(cx);
 24        this
 25    }
 26
 27    #[cfg(any(test, feature = "test-support"))]
 28    pub fn set_session_list(
 29        &mut self,
 30        session_list: Rc<dyn AgentSessionList>,
 31        cx: &mut Context<Self>,
 32    ) {
 33        if Rc::ptr_eq(&self.session_list, &session_list) {
 34            return;
 35        }
 36
 37        self.session_list = session_list;
 38        self.sessions.clear();
 39        self._refresh_task = Task::ready(());
 40        self.start_watching(cx);
 41    }
 42
 43    fn start_watching(&mut self, cx: &mut Context<Self>) {
 44        let Some(rx) = self.session_list.watch(cx) else {
 45            self._watch_task = None;
 46            self.refresh_sessions(false, cx);
 47            return;
 48        };
 49        self.session_list.notify_refresh();
 50
 51        self._watch_task = Some(cx.spawn(async move |this, cx| {
 52            while let Ok(first_update) = rx.recv().await {
 53                let mut updates = vec![first_update];
 54                while let Ok(update) = rx.try_recv() {
 55                    updates.push(update);
 56                }
 57
 58                this.update(cx, |this, cx| {
 59                    let needs_refresh = updates
 60                        .iter()
 61                        .any(|u| matches!(u, SessionListUpdate::Refresh));
 62
 63                    if needs_refresh {
 64                        this.refresh_sessions(false, cx);
 65                    } else {
 66                        for update in updates {
 67                            if let SessionListUpdate::SessionInfo { session_id, update } = update {
 68                                this.apply_info_update(session_id, update, cx);
 69                            }
 70                        }
 71                    }
 72                })
 73                .ok();
 74            }
 75        }));
 76    }
 77
 78    fn apply_info_update(
 79        &mut self,
 80        session_id: acp::SessionId,
 81        info_update: acp::SessionInfoUpdate,
 82        cx: &mut Context<Self>,
 83    ) {
 84        let Some(session) = self
 85            .sessions
 86            .iter_mut()
 87            .find(|s| s.session_id == session_id)
 88        else {
 89            return;
 90        };
 91
 92        match info_update.title {
 93            acp::MaybeUndefined::Value(title) => {
 94                session.title = Some(title.into());
 95            }
 96            acp::MaybeUndefined::Null => {
 97                session.title = None;
 98            }
 99            acp::MaybeUndefined::Undefined => {}
100        }
101        match info_update.updated_at {
102            acp::MaybeUndefined::Value(date_str) => {
103                if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(&date_str) {
104                    session.updated_at = Some(dt.with_timezone(&chrono::Utc));
105                }
106            }
107            acp::MaybeUndefined::Null => {
108                session.updated_at = None;
109            }
110            acp::MaybeUndefined::Undefined => {}
111        }
112        if let Some(meta) = info_update.meta {
113            session.meta = Some(meta);
114        }
115
116        cx.notify();
117    }
118
119    fn refresh_sessions(&mut self, load_all_pages: bool, cx: &mut Context<Self>) {
120        let session_list = self.session_list.clone();
121
122        self._refresh_task = cx.spawn(async move |this, cx| {
123            let mut cursor: Option<String> = None;
124            let mut is_first_page = true;
125
126            loop {
127                let request = AgentSessionListRequest {
128                    cursor: cursor.clone(),
129                    ..Default::default()
130                };
131                let task = cx.update(|cx| session_list.list_sessions(request, cx));
132                let response = match task.await {
133                    Ok(response) => response,
134                    Err(error) => {
135                        log::error!("Failed to load session history: {error:#}");
136                        return;
137                    }
138                };
139
140                let acp_thread::AgentSessionListResponse {
141                    sessions: page_sessions,
142                    next_cursor,
143                    ..
144                } = response;
145
146                this.update(cx, |this, cx| {
147                    if is_first_page {
148                        this.sessions = page_sessions;
149                    } else {
150                        this.sessions.extend(page_sessions);
151                    }
152                    cx.notify();
153                })
154                .ok();
155
156                is_first_page = false;
157                if !load_all_pages {
158                    break;
159                }
160
161                match next_cursor {
162                    Some(next_cursor) => {
163                        if cursor.as_ref() == Some(&next_cursor) {
164                            log::warn!(
165                                "Session list pagination returned the same cursor; stopping to avoid a loop."
166                            );
167                            break;
168                        }
169                        cursor = Some(next_cursor);
170                    }
171                    None => break,
172                }
173            }
174        });
175    }
176
177    pub fn refresh(&mut self, _cx: &mut Context<Self>) {
178        self.session_list.notify_refresh();
179    }
180
181    pub fn session_for_id(&self, session_id: &acp::SessionId) -> Option<AgentSessionInfo> {
182        self.sessions
183            .iter()
184            .find(|entry| &entry.session_id == session_id)
185            .cloned()
186    }
187
188    pub(crate) fn sessions(&self) -> &[AgentSessionInfo] {
189        &self.sessions
190    }
191}
192
193#[cfg(test)]
194mod tests {
195    use super::*;
196    use acp_thread::AgentSessionListResponse;
197    use gpui::TestAppContext;
198    use std::{
199        any::Any,
200        sync::{Arc, Mutex},
201    };
202
203    fn init_test(cx: &mut TestAppContext) {
204        cx.update(|cx| {
205            let settings_store = settings::SettingsStore::test(cx);
206            cx.set_global(settings_store);
207            theme_settings::init(theme::LoadThemes::JustBase, cx);
208        });
209    }
210
211    #[derive(Clone)]
212    struct TestSessionList {
213        sessions: Vec<AgentSessionInfo>,
214        updates_tx: smol::channel::Sender<SessionListUpdate>,
215        updates_rx: smol::channel::Receiver<SessionListUpdate>,
216    }
217
218    impl TestSessionList {
219        fn new(sessions: Vec<AgentSessionInfo>) -> Self {
220            let (tx, rx) = smol::channel::unbounded();
221            Self {
222                sessions,
223                updates_tx: tx,
224                updates_rx: rx,
225            }
226        }
227
228        fn send_update(&self, update: SessionListUpdate) {
229            self.updates_tx.try_send(update).ok();
230        }
231    }
232
233    impl AgentSessionList for TestSessionList {
234        fn list_sessions(
235            &self,
236            _request: AgentSessionListRequest,
237            _cx: &mut App,
238        ) -> Task<anyhow::Result<AgentSessionListResponse>> {
239            Task::ready(Ok(AgentSessionListResponse::new(self.sessions.clone())))
240        }
241
242        fn watch(&self, _cx: &mut App) -> Option<smol::channel::Receiver<SessionListUpdate>> {
243            Some(self.updates_rx.clone())
244        }
245
246        fn notify_refresh(&self) {
247            self.send_update(SessionListUpdate::Refresh);
248        }
249
250        fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
251            self
252        }
253    }
254
255    #[derive(Clone)]
256    struct PaginatedTestSessionList {
257        first_page_sessions: Vec<AgentSessionInfo>,
258        second_page_sessions: Vec<AgentSessionInfo>,
259        requested_cursors: Arc<Mutex<Vec<Option<String>>>>,
260        async_responses: bool,
261        updates_tx: smol::channel::Sender<SessionListUpdate>,
262        updates_rx: smol::channel::Receiver<SessionListUpdate>,
263    }
264
265    impl PaginatedTestSessionList {
266        fn new(
267            first_page_sessions: Vec<AgentSessionInfo>,
268            second_page_sessions: Vec<AgentSessionInfo>,
269        ) -> Self {
270            let (tx, rx) = smol::channel::unbounded();
271            Self {
272                first_page_sessions,
273                second_page_sessions,
274                requested_cursors: Arc::new(Mutex::new(Vec::new())),
275                async_responses: false,
276                updates_tx: tx,
277                updates_rx: rx,
278            }
279        }
280
281        fn with_async_responses(mut self) -> Self {
282            self.async_responses = true;
283            self
284        }
285
286        fn requested_cursors(&self) -> Vec<Option<String>> {
287            self.requested_cursors.lock().unwrap().clone()
288        }
289
290        fn clear_requested_cursors(&self) {
291            self.requested_cursors.lock().unwrap().clear()
292        }
293
294        fn send_update(&self, update: SessionListUpdate) {
295            self.updates_tx.try_send(update).ok();
296        }
297    }
298
299    impl AgentSessionList for PaginatedTestSessionList {
300        fn list_sessions(
301            &self,
302            request: AgentSessionListRequest,
303            cx: &mut App,
304        ) -> Task<anyhow::Result<AgentSessionListResponse>> {
305            let requested_cursors = self.requested_cursors.clone();
306            let first_page_sessions = self.first_page_sessions.clone();
307            let second_page_sessions = self.second_page_sessions.clone();
308
309            let respond = move || {
310                requested_cursors
311                    .lock()
312                    .unwrap()
313                    .push(request.cursor.clone());
314
315                match request.cursor.as_deref() {
316                    None => AgentSessionListResponse {
317                        sessions: first_page_sessions,
318                        next_cursor: Some("page-2".to_string()),
319                        meta: None,
320                    },
321                    Some("page-2") => AgentSessionListResponse::new(second_page_sessions),
322                    _ => AgentSessionListResponse::new(Vec::new()),
323                }
324            };
325
326            if self.async_responses {
327                cx.foreground_executor().spawn(async move {
328                    smol::future::yield_now().await;
329                    Ok(respond())
330                })
331            } else {
332                Task::ready(Ok(respond()))
333            }
334        }
335
336        fn watch(&self, _cx: &mut App) -> Option<smol::channel::Receiver<SessionListUpdate>> {
337            Some(self.updates_rx.clone())
338        }
339
340        fn notify_refresh(&self) {
341            self.send_update(SessionListUpdate::Refresh);
342        }
343
344        fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
345            self
346        }
347    }
348
349    fn test_session(session_id: &str, title: &str) -> AgentSessionInfo {
350        AgentSessionInfo {
351            session_id: acp::SessionId::new(session_id),
352            work_dirs: None,
353            title: Some(title.to_string().into()),
354            updated_at: None,
355            created_at: None,
356            meta: None,
357        }
358    }
359
360    #[gpui::test]
361    async fn test_refresh_only_loads_first_page_by_default(cx: &mut TestAppContext) {
362        init_test(cx);
363
364        let session_list = Rc::new(PaginatedTestSessionList::new(
365            vec![test_session("session-1", "First")],
366            vec![test_session("session-2", "Second")],
367        ));
368
369        let history = cx.new(|cx| ThreadHistory::new(session_list.clone(), cx));
370        cx.run_until_parked();
371
372        history.update(cx, |history, _cx| {
373            assert_eq!(history.sessions.len(), 1);
374            assert_eq!(
375                history.sessions[0].session_id,
376                acp::SessionId::new("session-1")
377            );
378        });
379        assert_eq!(session_list.requested_cursors(), vec![None]);
380    }
381
382    #[gpui::test]
383    async fn test_enabling_full_pagination_loads_all_pages(cx: &mut TestAppContext) {
384        init_test(cx);
385
386        let session_list = Rc::new(PaginatedTestSessionList::new(
387            vec![test_session("session-1", "First")],
388            vec![test_session("session-2", "Second")],
389        ));
390
391        let history = cx.new(|cx| ThreadHistory::new(session_list.clone(), cx));
392        cx.run_until_parked();
393        session_list.clear_requested_cursors();
394
395        history.update(cx, |history, cx| history.refresh_sessions(true, cx));
396        cx.run_until_parked();
397
398        history.update(cx, |history, _cx| {
399            assert_eq!(history.sessions.len(), 2);
400            assert_eq!(
401                history.sessions[0].session_id,
402                acp::SessionId::new("session-1")
403            );
404            assert_eq!(
405                history.sessions[1].session_id,
406                acp::SessionId::new("session-2")
407            );
408        });
409        assert_eq!(
410            session_list.requested_cursors(),
411            vec![None, Some("page-2".to_string())]
412        );
413    }
414
415    #[gpui::test]
416    async fn test_standard_refresh_replaces_with_first_page_after_full_history_refresh(
417        cx: &mut TestAppContext,
418    ) {
419        init_test(cx);
420
421        let session_list = Rc::new(PaginatedTestSessionList::new(
422            vec![test_session("session-1", "First")],
423            vec![test_session("session-2", "Second")],
424        ));
425
426        let history = cx.new(|cx| ThreadHistory::new(session_list.clone(), cx));
427        cx.run_until_parked();
428
429        history.update(cx, |history, cx| history.refresh_sessions(true, cx));
430        cx.run_until_parked();
431        session_list.clear_requested_cursors();
432
433        history.update(cx, |history, cx| {
434            history.refresh(cx);
435        });
436        cx.run_until_parked();
437
438        history.update(cx, |history, _cx| {
439            assert_eq!(history.sessions.len(), 1);
440            assert_eq!(
441                history.sessions[0].session_id,
442                acp::SessionId::new("session-1")
443            );
444        });
445        assert_eq!(session_list.requested_cursors(), vec![None]);
446    }
447
448    #[gpui::test]
449    async fn test_re_entering_full_pagination_reloads_all_pages(cx: &mut TestAppContext) {
450        init_test(cx);
451
452        let session_list = Rc::new(PaginatedTestSessionList::new(
453            vec![test_session("session-1", "First")],
454            vec![test_session("session-2", "Second")],
455        ));
456
457        let history = cx.new(|cx| ThreadHistory::new(session_list.clone(), cx));
458        cx.run_until_parked();
459
460        history.update(cx, |history, cx| history.refresh_sessions(true, cx));
461        cx.run_until_parked();
462        session_list.clear_requested_cursors();
463
464        history.update(cx, |history, cx| history.refresh_sessions(true, cx));
465        cx.run_until_parked();
466
467        history.update(cx, |history, _cx| {
468            assert_eq!(history.sessions.len(), 2);
469        });
470        assert_eq!(
471            session_list.requested_cursors(),
472            vec![None, Some("page-2".to_string())]
473        );
474    }
475
476    #[gpui::test]
477    async fn test_partial_refresh_batch_drops_non_first_page_sessions(cx: &mut TestAppContext) {
478        init_test(cx);
479
480        let second_page_session_id = acp::SessionId::new("session-2");
481        let session_list = Rc::new(PaginatedTestSessionList::new(
482            vec![test_session("session-1", "First")],
483            vec![test_session("session-2", "Second")],
484        ));
485
486        let history = cx.new(|cx| ThreadHistory::new(session_list.clone(), cx));
487        cx.run_until_parked();
488
489        history.update(cx, |history, cx| history.refresh_sessions(true, cx));
490        cx.run_until_parked();
491
492        session_list.clear_requested_cursors();
493
494        session_list.send_update(SessionListUpdate::SessionInfo {
495            session_id: second_page_session_id.clone(),
496            update: acp::SessionInfoUpdate::new().title("Updated Second"),
497        });
498        session_list.send_update(SessionListUpdate::Refresh);
499        cx.run_until_parked();
500
501        history.update(cx, |history, _cx| {
502            assert_eq!(history.sessions.len(), 1);
503            assert_eq!(
504                history.sessions[0].session_id,
505                acp::SessionId::new("session-1")
506            );
507            assert!(
508                history
509                    .sessions
510                    .iter()
511                    .all(|session| session.session_id != second_page_session_id)
512            );
513        });
514        assert_eq!(session_list.requested_cursors(), vec![None]);
515    }
516
517    #[gpui::test]
518    async fn test_full_pagination_works_with_async_page_fetches(cx: &mut TestAppContext) {
519        init_test(cx);
520
521        let session_list = Rc::new(
522            PaginatedTestSessionList::new(
523                vec![test_session("session-1", "First")],
524                vec![test_session("session-2", "Second")],
525            )
526            .with_async_responses(),
527        );
528
529        let history = cx.new(|cx| ThreadHistory::new(session_list.clone(), cx));
530        cx.run_until_parked();
531        session_list.clear_requested_cursors();
532
533        history.update(cx, |history, cx| history.refresh_sessions(true, cx));
534        cx.run_until_parked();
535
536        history.update(cx, |history, _cx| {
537            assert_eq!(history.sessions.len(), 2);
538        });
539        assert_eq!(
540            session_list.requested_cursors(),
541            vec![None, Some("page-2".to_string())]
542        );
543    }
544
545    #[gpui::test]
546    async fn test_apply_info_update_title(cx: &mut TestAppContext) {
547        init_test(cx);
548
549        let session_id = acp::SessionId::new("test-session");
550        let sessions = vec![AgentSessionInfo {
551            session_id: session_id.clone(),
552            work_dirs: None,
553            title: Some("Original Title".into()),
554            updated_at: None,
555            created_at: None,
556            meta: None,
557        }];
558        let session_list = Rc::new(TestSessionList::new(sessions));
559
560        let history = cx.new(|cx| ThreadHistory::new(session_list.clone(), cx));
561        cx.run_until_parked();
562
563        session_list.send_update(SessionListUpdate::SessionInfo {
564            session_id: session_id.clone(),
565            update: acp::SessionInfoUpdate::new().title("New Title"),
566        });
567        cx.run_until_parked();
568
569        history.update(cx, |history, _cx| {
570            let session = history.sessions.iter().find(|s| s.session_id == session_id);
571            assert_eq!(
572                session.unwrap().title.as_ref().map(|s| s.as_ref()),
573                Some("New Title")
574            );
575        });
576    }
577
578    #[gpui::test]
579    async fn test_apply_info_update_clears_title_with_null(cx: &mut TestAppContext) {
580        init_test(cx);
581
582        let session_id = acp::SessionId::new("test-session");
583        let sessions = vec![AgentSessionInfo {
584            session_id: session_id.clone(),
585            work_dirs: None,
586            title: Some("Original Title".into()),
587            updated_at: None,
588            created_at: None,
589            meta: None,
590        }];
591        let session_list = Rc::new(TestSessionList::new(sessions));
592
593        let history = cx.new(|cx| ThreadHistory::new(session_list.clone(), cx));
594        cx.run_until_parked();
595
596        session_list.send_update(SessionListUpdate::SessionInfo {
597            session_id: session_id.clone(),
598            update: acp::SessionInfoUpdate::new().title(None::<String>),
599        });
600        cx.run_until_parked();
601
602        history.update(cx, |history, _cx| {
603            let session = history.sessions.iter().find(|s| s.session_id == session_id);
604            assert_eq!(session.unwrap().title, None);
605        });
606    }
607
608    #[gpui::test]
609    async fn test_apply_info_update_ignores_undefined_fields(cx: &mut TestAppContext) {
610        init_test(cx);
611
612        let session_id = acp::SessionId::new("test-session");
613        let sessions = vec![AgentSessionInfo {
614            session_id: session_id.clone(),
615            work_dirs: None,
616            title: Some("Original Title".into()),
617            updated_at: None,
618            created_at: None,
619            meta: None,
620        }];
621        let session_list = Rc::new(TestSessionList::new(sessions));
622
623        let history = cx.new(|cx| ThreadHistory::new(session_list.clone(), cx));
624        cx.run_until_parked();
625
626        session_list.send_update(SessionListUpdate::SessionInfo {
627            session_id: session_id.clone(),
628            update: acp::SessionInfoUpdate::new(),
629        });
630        cx.run_until_parked();
631
632        history.update(cx, |history, _cx| {
633            let session = history.sessions.iter().find(|s| s.session_id == session_id);
634            assert_eq!(
635                session.unwrap().title.as_ref().map(|s| s.as_ref()),
636                Some("Original Title")
637            );
638        });
639    }
640
641    #[gpui::test]
642    async fn test_multiple_info_updates_applied_in_order(cx: &mut TestAppContext) {
643        init_test(cx);
644
645        let session_id = acp::SessionId::new("test-session");
646        let sessions = vec![AgentSessionInfo {
647            session_id: session_id.clone(),
648            work_dirs: None,
649            title: None,
650            updated_at: None,
651            created_at: None,
652            meta: None,
653        }];
654        let session_list = Rc::new(TestSessionList::new(sessions));
655
656        let history = cx.new(|cx| ThreadHistory::new(session_list.clone(), cx));
657        cx.run_until_parked();
658
659        session_list.send_update(SessionListUpdate::SessionInfo {
660            session_id: session_id.clone(),
661            update: acp::SessionInfoUpdate::new().title("First Title"),
662        });
663        session_list.send_update(SessionListUpdate::SessionInfo {
664            session_id: session_id.clone(),
665            update: acp::SessionInfoUpdate::new().title("Second Title"),
666        });
667        cx.run_until_parked();
668
669        history.update(cx, |history, _cx| {
670            let session = history.sessions.iter().find(|s| s.session_id == session_id);
671            assert_eq!(
672                session.unwrap().title.as_ref().map(|s| s.as_ref()),
673                Some("Second Title")
674            );
675        });
676    }
677
678    #[gpui::test]
679    async fn test_refresh_supersedes_info_updates(cx: &mut TestAppContext) {
680        init_test(cx);
681
682        let session_id = acp::SessionId::new("test-session");
683        let sessions = vec![AgentSessionInfo {
684            session_id: session_id.clone(),
685            work_dirs: None,
686            title: Some("Server Title".into()),
687            updated_at: None,
688            created_at: None,
689            meta: None,
690        }];
691        let session_list = Rc::new(TestSessionList::new(sessions));
692
693        let history = cx.new(|cx| ThreadHistory::new(session_list.clone(), cx));
694        cx.run_until_parked();
695
696        session_list.send_update(SessionListUpdate::SessionInfo {
697            session_id: session_id.clone(),
698            update: acp::SessionInfoUpdate::new().title("Local Update"),
699        });
700        session_list.send_update(SessionListUpdate::Refresh);
701        cx.run_until_parked();
702
703        history.update(cx, |history, _cx| {
704            let session = history.sessions.iter().find(|s| s.session_id == session_id);
705            assert_eq!(
706                session.unwrap().title.as_ref().map(|s| s.as_ref()),
707                Some("Server Title")
708            );
709        });
710    }
711
712    #[gpui::test]
713    async fn test_info_update_for_unknown_session_is_ignored(cx: &mut TestAppContext) {
714        init_test(cx);
715
716        let session_id = acp::SessionId::new("known-session");
717        let sessions = vec![AgentSessionInfo {
718            session_id,
719            work_dirs: None,
720            title: Some("Original".into()),
721            updated_at: None,
722            created_at: None,
723            meta: None,
724        }];
725        let session_list = Rc::new(TestSessionList::new(sessions));
726
727        let history = cx.new(|cx| ThreadHistory::new(session_list.clone(), cx));
728        cx.run_until_parked();
729
730        session_list.send_update(SessionListUpdate::SessionInfo {
731            session_id: acp::SessionId::new("unknown-session"),
732            update: acp::SessionInfoUpdate::new().title("Should Be Ignored"),
733        });
734        cx.run_until_parked();
735
736        history.update(cx, |history, _cx| {
737            assert_eq!(history.sessions.len(), 1);
738            assert_eq!(
739                history.sessions[0].title.as_ref().map(|s| s.as_ref()),
740                Some("Original")
741            );
742        });
743    }
744}