@@ -38,7 +38,8 @@ pub struct AcpThreadHistory {
visible_items: Vec<ListItemType>,
local_timezone: UtcOffset,
confirming_delete_history: bool,
- _update_task: Task<()>,
+ _visible_items_task: Task<()>,
+ _refresh_task: Task<()>,
_watch_task: Option<Task<()>>,
_subscriptions: Vec<gpui::Subscription>,
}
@@ -111,7 +112,8 @@ impl AcpThreadHistory {
search_query: SharedString::default(),
confirming_delete_history: false,
_subscriptions: vec![search_editor_subscription],
- _update_task: Task::ready(()),
+ _visible_items_task: Task::ready(()),
+ _refresh_task: Task::ready(()),
_watch_task: None,
};
this.set_session_list(session_list, cx);
@@ -131,7 +133,7 @@ impl AcpThreadHistory {
None
};
- self._update_task = cx.spawn(async move |this, cx| {
+ self._visible_items_task = cx.spawn(async move |this, cx| {
let new_visible_items = new_list_items.await;
this.update(cx, |this, cx| {
let new_selected_index = if let Some(history_entry) = selected_history_entry {
@@ -170,6 +172,8 @@ impl AcpThreadHistory {
self.sessions.clear();
self.visible_items.clear();
self.selected_index = 0;
+ self._visible_items_task = Task::ready(());
+ self._refresh_task = Task::ready(());
let Some(session_list) = self.session_list.as_ref() else {
self._watch_task = None;
@@ -179,7 +183,7 @@ impl AcpThreadHistory {
let Some(rx) = session_list.watch(cx) else {
// No watch support - do a one-time refresh
self._watch_task = None;
- self.refresh_sessions(false, cx);
+ self.refresh_sessions(false, false, cx);
return;
};
session_list.notify_refresh();
@@ -192,14 +196,13 @@ impl AcpThreadHistory {
updates.push(update);
}
- let needs_refresh = updates
- .iter()
- .any(|u| matches!(u, SessionListUpdate::Refresh));
-
this.update(cx, |this, cx| {
- // We will refresh the whole list anyway, so no need to apply incremental updates or do several refreshes
+ let needs_refresh = updates
+ .iter()
+ .any(|u| matches!(u, SessionListUpdate::Refresh));
+
if needs_refresh {
- this.refresh_sessions(true, cx);
+ this.refresh_sessions(true, false, cx);
} else {
for update in updates {
if let SessionListUpdate::SessionInfo { session_id, update } = update {
@@ -213,6 +216,10 @@ impl AcpThreadHistory {
}));
}
+ pub(crate) fn refresh_full_history(&mut self, cx: &mut Context<Self>) {
+ self.refresh_sessions(true, true, cx);
+ }
+
fn apply_info_update(
&mut self,
session_id: acp::SessionId,
@@ -254,13 +261,21 @@ impl AcpThreadHistory {
self.update_visible_items(true, cx);
}
- fn refresh_sessions(&mut self, preserve_selected_item: bool, cx: &mut Context<Self>) {
+ fn refresh_sessions(
+ &mut self,
+ preserve_selected_item: bool,
+ load_all_pages: bool,
+ cx: &mut Context<Self>,
+ ) {
let Some(session_list) = self.session_list.clone() else {
self.update_visible_items(preserve_selected_item, cx);
return;
};
- self._update_task = cx.spawn(async move |this, cx| {
+ // If a new refresh arrives while pagination is in progress, the previous
+ // `_refresh_task` is cancelled. This is intentional (latest refresh wins),
+ // but means sessions may be in a partial state until the new refresh completes.
+ self._refresh_task = cx.spawn(async move |this, cx| {
let mut cursor: Option<String> = None;
let mut is_first_page = true;
@@ -295,6 +310,10 @@ impl AcpThreadHistory {
.ok();
is_first_page = false;
+ if !load_all_pages {
+ break;
+ }
+
match next_cursor {
Some(next_cursor) => {
if cursor.as_ref() == Some(&next_cursor) {
@@ -1050,7 +1069,10 @@ mod tests {
use acp_thread::AgentSessionListResponse;
use chrono::NaiveDate;
use gpui::TestAppContext;
- use std::any::Any;
+ use std::{
+ any::Any,
+ sync::{Arc, Mutex},
+ };
fn init_test(cx: &mut TestAppContext) {
cx.update(|cx| {
@@ -1104,6 +1126,307 @@ mod tests {
}
}
+ #[derive(Clone)]
+ struct PaginatedTestSessionList {
+ first_page_sessions: Vec<AgentSessionInfo>,
+ second_page_sessions: Vec<AgentSessionInfo>,
+ requested_cursors: Arc<Mutex<Vec<Option<String>>>>,
+ async_responses: bool,
+ updates_tx: smol::channel::Sender<SessionListUpdate>,
+ updates_rx: smol::channel::Receiver<SessionListUpdate>,
+ }
+
+ impl PaginatedTestSessionList {
+ fn new(
+ first_page_sessions: Vec<AgentSessionInfo>,
+ second_page_sessions: Vec<AgentSessionInfo>,
+ ) -> Self {
+ let (tx, rx) = smol::channel::unbounded();
+ Self {
+ first_page_sessions,
+ second_page_sessions,
+ requested_cursors: Arc::new(Mutex::new(Vec::new())),
+ async_responses: false,
+ updates_tx: tx,
+ updates_rx: rx,
+ }
+ }
+
+ fn with_async_responses(mut self) -> Self {
+ self.async_responses = true;
+ self
+ }
+
+ fn requested_cursors(&self) -> Vec<Option<String>> {
+ self.requested_cursors.lock().unwrap().clone()
+ }
+
+ fn clear_requested_cursors(&self) {
+ self.requested_cursors.lock().unwrap().clear()
+ }
+
+ fn send_update(&self, update: SessionListUpdate) {
+ self.updates_tx.try_send(update).ok();
+ }
+ }
+
+ impl AgentSessionList for PaginatedTestSessionList {
+ fn list_sessions(
+ &self,
+ request: AgentSessionListRequest,
+ cx: &mut App,
+ ) -> Task<anyhow::Result<AgentSessionListResponse>> {
+ let requested_cursors = self.requested_cursors.clone();
+ let first_page_sessions = self.first_page_sessions.clone();
+ let second_page_sessions = self.second_page_sessions.clone();
+
+ let respond = move || {
+ requested_cursors
+ .lock()
+ .unwrap()
+ .push(request.cursor.clone());
+
+ match request.cursor.as_deref() {
+ None => AgentSessionListResponse {
+ sessions: first_page_sessions,
+ next_cursor: Some("page-2".to_string()),
+ meta: None,
+ },
+ Some("page-2") => AgentSessionListResponse::new(second_page_sessions),
+ _ => AgentSessionListResponse::new(Vec::new()),
+ }
+ };
+
+ if self.async_responses {
+ cx.foreground_executor().spawn(async move {
+ smol::future::yield_now().await;
+ Ok(respond())
+ })
+ } else {
+ Task::ready(Ok(respond()))
+ }
+ }
+
+ fn watch(&self, _cx: &mut App) -> Option<smol::channel::Receiver<SessionListUpdate>> {
+ Some(self.updates_rx.clone())
+ }
+
+ fn notify_refresh(&self) {
+ self.send_update(SessionListUpdate::Refresh);
+ }
+
+ fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
+ self
+ }
+ }
+
+ fn test_session(session_id: &str, title: &str) -> AgentSessionInfo {
+ AgentSessionInfo {
+ session_id: acp::SessionId::new(session_id),
+ cwd: None,
+ title: Some(title.to_string().into()),
+ updated_at: None,
+ meta: None,
+ }
+ }
+
+ #[gpui::test]
+ async fn test_refresh_only_loads_first_page_by_default(cx: &mut TestAppContext) {
+ init_test(cx);
+
+ let session_list = Rc::new(PaginatedTestSessionList::new(
+ vec![test_session("session-1", "First")],
+ vec![test_session("session-2", "Second")],
+ ));
+
+ let (history, cx) = cx.add_window_view(|window, cx| {
+ AcpThreadHistory::new(Some(session_list.clone()), window, cx)
+ });
+ cx.run_until_parked();
+
+ history.update(cx, |history, _cx| {
+ assert_eq!(history.sessions.len(), 1);
+ assert_eq!(
+ history.sessions[0].session_id,
+ acp::SessionId::new("session-1")
+ );
+ });
+ assert_eq!(session_list.requested_cursors(), vec![None]);
+ }
+
+ #[gpui::test]
+ async fn test_enabling_full_pagination_loads_all_pages(cx: &mut TestAppContext) {
+ init_test(cx);
+
+ let session_list = Rc::new(PaginatedTestSessionList::new(
+ vec![test_session("session-1", "First")],
+ vec![test_session("session-2", "Second")],
+ ));
+
+ let (history, cx) = cx.add_window_view(|window, cx| {
+ AcpThreadHistory::new(Some(session_list.clone()), window, cx)
+ });
+ cx.run_until_parked();
+ session_list.clear_requested_cursors();
+
+ history.update(cx, |history, cx| history.refresh_full_history(cx));
+ cx.run_until_parked();
+
+ history.update(cx, |history, _cx| {
+ assert_eq!(history.sessions.len(), 2);
+ assert_eq!(
+ history.sessions[0].session_id,
+ acp::SessionId::new("session-1")
+ );
+ assert_eq!(
+ history.sessions[1].session_id,
+ acp::SessionId::new("session-2")
+ );
+ });
+ assert_eq!(
+ session_list.requested_cursors(),
+ vec![None, Some("page-2".to_string())]
+ );
+ }
+
+ #[gpui::test]
+ async fn test_standard_refresh_replaces_with_first_page_after_full_history_refresh(
+ cx: &mut TestAppContext,
+ ) {
+ init_test(cx);
+
+ let session_list = Rc::new(PaginatedTestSessionList::new(
+ vec![test_session("session-1", "First")],
+ vec![test_session("session-2", "Second")],
+ ));
+
+ let (history, cx) = cx.add_window_view(|window, cx| {
+ AcpThreadHistory::new(Some(session_list.clone()), window, cx)
+ });
+ cx.run_until_parked();
+
+ history.update(cx, |history, cx| history.refresh_full_history(cx));
+ cx.run_until_parked();
+ session_list.clear_requested_cursors();
+
+ history.update(cx, |history, cx| {
+ history.refresh(cx);
+ });
+ cx.run_until_parked();
+
+ history.update(cx, |history, _cx| {
+ assert_eq!(history.sessions.len(), 1);
+ assert_eq!(
+ history.sessions[0].session_id,
+ acp::SessionId::new("session-1")
+ );
+ });
+ assert_eq!(session_list.requested_cursors(), vec![None]);
+ }
+
+ #[gpui::test]
+ async fn test_re_entering_full_pagination_reloads_all_pages(cx: &mut TestAppContext) {
+ init_test(cx);
+
+ let session_list = Rc::new(PaginatedTestSessionList::new(
+ vec![test_session("session-1", "First")],
+ vec![test_session("session-2", "Second")],
+ ));
+
+ let (history, cx) = cx.add_window_view(|window, cx| {
+ AcpThreadHistory::new(Some(session_list.clone()), window, cx)
+ });
+ cx.run_until_parked();
+
+ history.update(cx, |history, cx| history.refresh_full_history(cx));
+ cx.run_until_parked();
+ session_list.clear_requested_cursors();
+
+ history.update(cx, |history, cx| history.refresh_full_history(cx));
+ cx.run_until_parked();
+
+ history.update(cx, |history, _cx| {
+ assert_eq!(history.sessions.len(), 2);
+ });
+ assert_eq!(
+ session_list.requested_cursors(),
+ vec![None, Some("page-2".to_string())]
+ );
+ }
+
+ #[gpui::test]
+ async fn test_partial_refresh_batch_drops_non_first_page_sessions(cx: &mut TestAppContext) {
+ init_test(cx);
+
+ let second_page_session_id = acp::SessionId::new("session-2");
+ let session_list = Rc::new(PaginatedTestSessionList::new(
+ vec![test_session("session-1", "First")],
+ vec![test_session("session-2", "Second")],
+ ));
+
+ let (history, cx) = cx.add_window_view(|window, cx| {
+ AcpThreadHistory::new(Some(session_list.clone()), window, cx)
+ });
+ cx.run_until_parked();
+
+ history.update(cx, |history, cx| history.refresh_full_history(cx));
+ cx.run_until_parked();
+
+ session_list.clear_requested_cursors();
+
+ session_list.send_update(SessionListUpdate::SessionInfo {
+ session_id: second_page_session_id.clone(),
+ update: acp::SessionInfoUpdate::new().title("Updated Second"),
+ });
+ session_list.send_update(SessionListUpdate::Refresh);
+ cx.run_until_parked();
+
+ history.update(cx, |history, _cx| {
+ assert_eq!(history.sessions.len(), 1);
+ assert_eq!(
+ history.sessions[0].session_id,
+ acp::SessionId::new("session-1")
+ );
+ assert!(
+ history
+ .sessions
+ .iter()
+ .all(|session| session.session_id != second_page_session_id)
+ );
+ });
+ assert_eq!(session_list.requested_cursors(), vec![None]);
+ }
+
+ #[gpui::test]
+ async fn test_full_pagination_works_with_async_page_fetches(cx: &mut TestAppContext) {
+ init_test(cx);
+
+ let session_list = Rc::new(
+ PaginatedTestSessionList::new(
+ vec![test_session("session-1", "First")],
+ vec![test_session("session-2", "Second")],
+ )
+ .with_async_responses(),
+ );
+
+ let (history, cx) = cx.add_window_view(|window, cx| {
+ AcpThreadHistory::new(Some(session_list.clone()), window, cx)
+ });
+ cx.run_until_parked();
+ session_list.clear_requested_cursors();
+
+ history.update(cx, |history, cx| history.refresh_full_history(cx));
+ cx.run_until_parked();
+
+ history.update(cx, |history, _cx| {
+ assert_eq!(history.sessions.len(), 2);
+ });
+ assert_eq!(
+ session_list.requested_cursors(),
+ vec![None, Some("page-2".to_string())]
+ );
+ }
+
#[gpui::test]
async fn test_apply_info_update_title(cx: &mut TestAppContext) {
init_test(cx);