diff --git a/crates/agent_ui/src/thread_metadata_store.rs b/crates/agent_ui/src/thread_metadata_store.rs index ef52dd43b9ed291a5c61b18a6c667bafdb449745..c0c604b11c51bbfb14c99e565c4877fea481fa31 100644 --- a/crates/agent_ui/src/thread_metadata_store.rs +++ b/crates/agent_ui/src/thread_metadata_store.rs @@ -1,4 +1,5 @@ use std::{ + future::Future, path::{Path, PathBuf}, sync::Arc, }; @@ -201,6 +202,7 @@ pub struct ThreadMetadataStore { session_subscriptions: HashMap, pending_thread_ops_tx: smol::channel::Sender, _db_operations_task: Task<()>, + in_flight_archives: HashMap, smol::channel::Sender<()>)>, } #[derive(Debug, PartialEq)] @@ -425,12 +427,34 @@ impl ThreadMetadataStore { } } - pub fn archive(&mut self, session_id: &acp::SessionId, cx: &mut Context) { + pub fn archive( + &mut self, + session_id: &acp::SessionId, + task_builder: Option, + cx: &mut Context, + ) where + F: FnOnce(smol::channel::Receiver<()>) -> Fut, + Fut: Future + 'static, + { self.update_archived(session_id, true, cx); + + if let Some(task_builder) = task_builder { + let (cancel_tx, cancel_rx) = smol::channel::bounded(1); + let future = task_builder(cancel_rx); + let task = cx.foreground_executor().spawn(future); + self.in_flight_archives + .insert(session_id.clone(), (task, cancel_tx)); + } } pub fn unarchive(&mut self, session_id: &acp::SessionId, cx: &mut Context) { self.update_archived(session_id, false, cx); + // Dropping the Sender triggers cancellation in the background task. + self.in_flight_archives.remove(session_id); + } + + pub fn cleanup_completed_archive(&mut self, session_id: &acp::SessionId) { + self.in_flight_archives.remove(session_id); } pub fn complete_worktree_restore( @@ -626,6 +650,7 @@ impl ThreadMetadataStore { session_subscriptions: HashMap::default(), pending_thread_ops_tx: tx, _db_operations_task, + in_flight_archives: HashMap::default(), }; let _ = this.reload(cx); this @@ -1882,7 +1907,11 @@ mod tests { cx.update(|cx| { let store = ThreadMetadataStore::global(cx); store.update(cx, |store, cx| { - store.archive(&acp::SessionId::new("session-1"), cx); + store.archive( + &acp::SessionId::new("session-1"), + None::) -> std::future::Ready<()>>, + cx, + ); }); }); @@ -1959,7 +1988,11 @@ mod tests { cx.update(|cx| { let store = ThreadMetadataStore::global(cx); store.update(cx, |store, cx| { - store.archive(&acp::SessionId::new("session-2"), cx); + store.archive( + &acp::SessionId::new("session-2"), + None::) -> std::future::Ready<()>>, + cx, + ); }); }); @@ -2059,7 +2092,11 @@ mod tests { cx.update(|cx| { let store = ThreadMetadataStore::global(cx); store.update(cx, |store, cx| { - store.archive(&acp::SessionId::new("session-1"), cx); + store.archive( + &acp::SessionId::new("session-1"), + None::) -> std::future::Ready<()>>, + cx, + ); }); }); @@ -2107,7 +2144,11 @@ mod tests { cx.update(|cx| { let store = ThreadMetadataStore::global(cx); store.update(cx, |store, cx| { - store.archive(&acp::SessionId::new("nonexistent"), cx); + store.archive( + &acp::SessionId::new("nonexistent"), + None::) -> std::future::Ready<()>>, + cx, + ); }); }); @@ -2136,7 +2177,11 @@ mod tests { let store = ThreadMetadataStore::global(cx); store.update(cx, |store, cx| { store.save(metadata.clone(), cx); - store.archive(&session_id, cx); + store.archive( + &session_id, + None::) -> std::future::Ready<()>>, + cx, + ); }); }); diff --git a/crates/agent_ui/src/thread_worktree_archive.rs b/crates/agent_ui/src/thread_worktree_archive.rs index 97a6053f4ce352e692e5316a84218e602ff3dea5..14c2d59803ae93fd0eb7f42d0d4936f8a9ccbda2 100644 --- a/crates/agent_ui/src/thread_worktree_archive.rs +++ b/crates/agent_ui/src/thread_worktree_archive.rs @@ -1,223 +1,52 @@ use std::{ - collections::HashSet, path::{Path, PathBuf}, sync::Arc, }; use agent_client_protocol as acp; use anyhow::{Context as _, Result, anyhow}; -use collections::HashMap; use git::repository::{AskPassDelegate, CommitOptions, ResetMode}; -use gpui::{App, AsyncApp, Entity, Global, Task, WindowHandle}; -use parking_lot::Mutex; +use gpui::{App, AsyncApp, Entity, Task, WindowHandle}; use project::{ LocalProjectFlags, Project, WorktreeId, git_store::{Repository, resolve_git_worktree_to_main_repo}, }; use util::ResultExt; -use workspace::{ - AppState, MultiWorkspace, OpenMode, OpenOptions, PathList, Toast, Workspace, - notifications::NotificationId, open_new, open_paths, -}; +use workspace::{AppState, MultiWorkspace, PathList, Workspace}; use crate::thread_metadata_store::{ArchivedGitWorktree, ThreadMetadataStore}; -#[derive(Default)] -pub struct ThreadArchiveCleanupCoordinator { - in_flight_roots: Mutex>, -} - -impl Global for ThreadArchiveCleanupCoordinator {} - -fn ensure_global(cx: &mut App) { - if !cx.has_global::() { - cx.set_global(ThreadArchiveCleanupCoordinator::default()); - } -} - -#[derive(Clone)] -pub struct ArchiveOutcome { - pub archived_immediately: bool, - pub roots_to_delete: Vec, -} - -#[derive(Clone)] -struct RootPlan { - root_path: PathBuf, - main_repo_path: PathBuf, - affected_projects: Vec, - worktree_repo: Option>, - branch_name: Option, -} - -#[derive(Clone)] -struct AffectedProject { - project: Entity, - worktree_id: WorktreeId, -} - #[derive(Clone)] -enum FallbackTarget { - ExistingWorkspace { - window: WindowHandle, - workspace: Entity, - }, - OpenPaths { - requesting_window: WindowHandle, - paths: Vec, - }, - OpenEmpty { - requesting_window: WindowHandle, - }, +pub struct RootPlan { + pub root_path: PathBuf, + pub main_repo_path: PathBuf, + pub affected_projects: Vec, + pub worktree_repo: Option>, + pub branch_name: Option, } #[derive(Clone)] -struct CleanupPlan { - folder_paths: PathList, - roots: Vec, - current_workspace: Option>, - current_workspace_will_be_empty: bool, - fallback: Option, - affected_workspaces: Vec>, +pub struct AffectedProject { + pub project: Entity, + pub worktree_id: WorktreeId, } fn archived_worktree_ref_name(id: i64) -> String { format!("refs/archived-worktrees/{}", id) } -struct PersistOutcome { - archived_worktree_id: i64, - staged_commit_hash: String, -} - -pub fn archive_thread( - session_id: &acp::SessionId, - current_workspace: Option>, - window: Option>, - cx: &mut App, -) -> ArchiveOutcome { - ensure_global(cx); - let plan = - window.and_then(|window| build_cleanup_plan(session_id, current_workspace, window, cx)); - - ThreadMetadataStore::global(cx).update(cx, |store, cx| store.archive(session_id, cx)); - - if let Some(plan) = plan { - let roots_to_delete = plan - .roots - .iter() - .map(|root| root.root_path.clone()) - .collect::>(); - if !roots_to_delete.is_empty() { - cx.spawn(async move |cx| { - run_cleanup(plan, cx).await; - }) - .detach(); - - return ArchiveOutcome { - archived_immediately: true, - roots_to_delete, - }; - } - } - - ArchiveOutcome { - archived_immediately: true, - roots_to_delete: Vec::new(), - } +pub struct PersistOutcome { + pub archived_worktree_id: i64, + pub staged_commit_hash: String, } -fn build_cleanup_plan( - session_id: &acp::SessionId, - current_workspace: Option>, - requesting_window: WindowHandle, +pub fn build_root_plan( + path: &Path, + workspaces: &[Entity], cx: &App, -) -> Option { - let metadata = ThreadMetadataStore::global(cx) - .read(cx) - .entry(session_id) - .cloned()?; - - let workspaces = all_open_workspaces(cx); - - let candidate_roots = metadata - .folder_paths - .ordered_paths() - .filter_map(|path| build_root_plan(path, &workspaces, cx)) - .filter(|plan| { - !path_is_referenced_by_other_unarchived_threads(session_id, &plan.root_path, cx) - }) - .collect::>(); - - if candidate_roots.is_empty() { - return Some(CleanupPlan { - folder_paths: metadata.folder_paths, - roots: Vec::new(), - current_workspace, - current_workspace_will_be_empty: false, - fallback: None, - affected_workspaces: Vec::new(), - }); - } - - let mut affected_workspaces = Vec::new(); - let mut current_workspace_will_be_empty = false; - - for workspace in workspaces.iter() { - let doomed_root_count = workspace - .read(cx) - .root_paths(cx) - .into_iter() - .filter(|path| { - candidate_roots - .iter() - .any(|root| root.root_path.as_path() == path.as_ref()) - }) - .count(); - - if doomed_root_count == 0 { - continue; - } - - let surviving_root_count = workspace - .read(cx) - .root_paths(cx) - .len() - .saturating_sub(doomed_root_count); - if current_workspace - .as_ref() - .is_some_and(|current| current == workspace) - { - current_workspace_will_be_empty = surviving_root_count == 0; - } - affected_workspaces.push(workspace.clone()); - } - - let fallback = if current_workspace_will_be_empty { - choose_fallback_target( - session_id, - current_workspace.as_ref(), - &candidate_roots, - &requesting_window, - &workspaces, - cx, - ) - } else { - None - }; - - Some(CleanupPlan { - folder_paths: metadata.folder_paths, - roots: candidate_roots, - current_workspace, - current_workspace_will_be_empty, - fallback, - affected_workspaces, - }) -} - -fn build_root_plan(path: &Path, workspaces: &[Entity], cx: &App) -> Option { +) -> Option { let path = path.to_path_buf(); + let affected_projects = workspaces .iter() .filter_map(|workspace| { @@ -267,7 +96,7 @@ fn build_root_plan(path: &Path, workspaces: &[Entity], cx: &App) -> O }) } -fn path_is_referenced_by_other_unarchived_threads( +pub fn path_is_referenced_by_other_unarchived_threads( current_session_id: &acp::SessionId, path: &Path, cx: &App, @@ -286,272 +115,7 @@ fn path_is_referenced_by_other_unarchived_threads( }) } -fn choose_fallback_target( - current_session_id: &acp::SessionId, - current_workspace: Option<&Entity>, - roots: &[RootPlan], - requesting_window: &WindowHandle, - workspaces: &[Entity], - cx: &App, -) -> Option { - let doomed_roots = roots - .iter() - .map(|root| root.root_path.clone()) - .collect::>(); - - let surviving_same_window = requesting_window.read(cx).ok().and_then(|multi_workspace| { - multi_workspace - .workspaces() - .iter() - .filter(|workspace| current_workspace.is_none_or(|current| *workspace != current)) - .find(|workspace| workspace_survives(workspace, &doomed_roots, cx)) - .cloned() - }); - if let Some(workspace) = surviving_same_window { - return Some(FallbackTarget::ExistingWorkspace { - window: *requesting_window, - workspace, - }); - } - - for window in cx - .windows() - .into_iter() - .filter_map(|window| window.downcast::()) - { - if window == *requesting_window { - continue; - } - if let Ok(multi_workspace) = window.read(cx) { - if let Some(workspace) = multi_workspace - .workspaces() - .iter() - .find(|workspace| workspace_survives(workspace, &doomed_roots, cx)) - .cloned() - { - return Some(FallbackTarget::ExistingWorkspace { window, workspace }); - } - } - } - - let safe_thread_workspace = ThreadMetadataStore::global(cx) - .read(cx) - .entries() - .filter(|metadata| metadata.session_id != *current_session_id && !metadata.archived) - .filter_map(|metadata| { - workspaces - .iter() - .find(|workspace| workspace_path_list(workspace, cx) == metadata.folder_paths) - .cloned() - }) - .find(|workspace| workspace_survives(workspace, &doomed_roots, cx)); - - if let Some(workspace) = safe_thread_workspace { - let window = window_for_workspace(&workspace, cx).unwrap_or(*requesting_window); - return Some(FallbackTarget::ExistingWorkspace { window, workspace }); - } - - if let Some(root) = roots.first() { - return Some(FallbackTarget::OpenPaths { - requesting_window: *requesting_window, - paths: vec![root.main_repo_path.clone()], - }); - } - - Some(FallbackTarget::OpenEmpty { - requesting_window: *requesting_window, - }) -} - -async fn run_cleanup(plan: CleanupPlan, cx: &mut AsyncApp) { - let roots_to_delete = - cx.update_global::(|coordinator, _cx| { - let mut in_flight_roots = coordinator.in_flight_roots.lock(); - plan.roots - .iter() - .filter_map(|root| { - if in_flight_roots.insert(root.root_path.clone()) { - Some(root.clone()) - } else { - None - } - }) - .collect::>() - }); - - if roots_to_delete.is_empty() { - return; - } - - let active_workspace = plan.current_workspace.clone(); - if let Some(workspace) = active_workspace - .as_ref() - .filter(|_| plan.current_workspace_will_be_empty) - { - let Some(window) = window_for_workspace_async(workspace, cx) else { - release_in_flight_roots(&roots_to_delete, cx); - return; - }; - - let should_continue = save_workspace_for_root_removal(workspace.clone(), window, cx).await; - if !should_continue { - release_in_flight_roots(&roots_to_delete, cx); - return; - } - } - - for workspace in plan - .affected_workspaces - .iter() - .filter(|workspace| Some((*workspace).clone()) != active_workspace) - { - let Some(window) = window_for_workspace_async(workspace, cx) else { - continue; - }; - - if !save_workspace_for_root_removal(workspace.clone(), window, cx).await { - release_in_flight_roots(&roots_to_delete, cx); - return; - } - } - - if plan.current_workspace_will_be_empty { - if let Some(fallback) = plan.fallback.clone() { - activate_fallback(fallback, cx).await.log_err(); - } - } - - let mut git_removal_errors: Vec<(PathBuf, anyhow::Error)> = Vec::new(); - let mut persist_errors: Vec<(PathBuf, anyhow::Error)> = Vec::new(); - let mut persist_outcomes: HashMap = HashMap::default(); - - for root in &roots_to_delete { - if root.worktree_repo.is_some() { - match persist_worktree_state(root, &plan, cx).await { - Ok(outcome) => { - persist_outcomes.insert(root.root_path.clone(), outcome); - } - Err(error) => { - log::error!( - "Failed to persist worktree state for {}: {error}", - root.root_path.display() - ); - persist_errors.push((root.root_path.clone(), error)); - continue; - } - } - } - - if let Err(error) = remove_root(root.clone(), cx).await { - if let Some(outcome) = persist_outcomes.remove(&root.root_path) { - rollback_persist(&outcome, root, cx).await; - } - git_removal_errors.push((root.root_path.clone(), error)); - } - } - - cleanup_empty_workspaces(&plan.affected_workspaces, cx).await; - - let all_errors: Vec<(PathBuf, anyhow::Error)> = persist_errors - .into_iter() - .chain(git_removal_errors) - .collect(); - - if !all_errors.is_empty() { - let detail = all_errors - .into_iter() - .map(|(path, error)| format!("{}: {error}", path.display())) - .collect::>() - .join("\n"); - show_error_toast( - "Thread archived, but linked worktree cleanup failed", - &detail, - &plan, - cx, - ); - } - - release_in_flight_roots(&roots_to_delete, cx); -} - -async fn save_workspace_for_root_removal( - workspace: Entity, - window: WindowHandle, - cx: &mut AsyncApp, -) -> bool { - let has_dirty_items = workspace.read_with(cx, |workspace, cx| { - workspace.items(cx).any(|item| item.is_dirty(cx)) - }); - - if has_dirty_items { - let _ = window.update(cx, |multi_workspace, window, cx| { - window.activate_window(); - multi_workspace.activate(workspace.clone(), window, cx); - }); - } - - let save_task = window.update(cx, |_multi_workspace, window, cx| { - workspace.update(cx, |workspace, cx| { - workspace.prompt_to_save_or_discard_dirty_items(window, cx) - }) - }); - - let Ok(task) = save_task else { - return false; - }; - - task.await.unwrap_or(false) -} - -async fn activate_fallback(target: FallbackTarget, cx: &mut AsyncApp) -> Result<()> { - match target { - FallbackTarget::ExistingWorkspace { window, workspace } => { - window.update(cx, |multi_workspace, window, cx| { - window.activate_window(); - multi_workspace.activate(workspace, window, cx); - })?; - } - FallbackTarget::OpenPaths { - requesting_window, - paths, - } => { - let app_state = current_app_state(cx).context("no workspace app state available")?; - cx.update(|cx| { - open_paths( - &paths, - app_state, - OpenOptions { - requesting_window: Some(requesting_window), - open_mode: OpenMode::Activate, - ..Default::default() - }, - cx, - ) - }) - .await?; - } - FallbackTarget::OpenEmpty { requesting_window } => { - let app_state = current_app_state(cx).context("no workspace app state available")?; - cx.update(|cx| { - open_new( - OpenOptions { - requesting_window: Some(requesting_window), - open_mode: OpenMode::Activate, - ..Default::default() - }, - app_state, - cx, - |_workspace, _window, _cx| {}, - ) - }) - .await?; - } - } - - Ok(()) -} - -async fn remove_root(root: RootPlan, cx: &mut AsyncApp) -> Result<()> { +pub async fn remove_root(root: RootPlan, cx: &mut AsyncApp) -> Result<()> { let release_tasks: Vec<_> = root .affected_projects .iter() @@ -687,9 +251,9 @@ async fn rollback_root(root: &RootPlan, cx: &mut AsyncApp) { } } -async fn persist_worktree_state( +pub async fn persist_worktree_state( root: &RootPlan, - plan: &CleanupPlan, + folder_paths: &PathList, cx: &mut AsyncApp, ) -> Result { let worktree_repo = root @@ -833,7 +397,7 @@ async fn persist_worktree_state( // Link all threads on this worktree to the archived record let session_ids: Vec = store.read_with(cx, |store, _cx| { store - .all_session_ids_for_path(&plan.folder_paths) + .all_session_ids_for_path(folder_paths) .cloned() .collect() }); @@ -900,7 +464,7 @@ async fn persist_worktree_state( }) } -async fn rollback_persist(outcome: &PersistOutcome, root: &RootPlan, cx: &mut AsyncApp) { +pub async fn rollback_persist(outcome: &PersistOutcome, root: &RootPlan, cx: &mut AsyncApp) { // Undo WIP commits on the worktree repo if let Some(worktree_repo) = &root.worktree_repo { let rx = worktree_repo.update(cx, |repo, cx| { @@ -934,34 +498,6 @@ async fn rollback_persist(outcome: &PersistOutcome, root: &RootPlan, cx: &mut As } } -async fn cleanup_empty_workspaces(workspaces: &[Entity], cx: &mut AsyncApp) { - for workspace in workspaces { - let is_empty = match workspace - .downgrade() - .read_with(cx, |workspace, cx| workspace.root_paths(cx).is_empty()) - { - Ok(is_empty) => is_empty, - Err(_) => { - log::debug!("Workspace entity already dropped during cleanup; skipping"); - continue; - } - }; - if !is_empty { - continue; - } - - let Some(window) = window_for_workspace_async(workspace, cx) else { - continue; - }; - - let _ = window.update(cx, |multi_workspace, window, cx| { - if !multi_workspace.remove(workspace, window, cx) { - window.remove_window(); - } - }); - } -} - pub async fn restore_worktree_via_git( row: &ArchivedGitWorktree, cx: &mut AsyncApp, @@ -1174,34 +710,7 @@ pub async fn cleanup_archived_worktree_record(row: &ArchivedGitWorktree, cx: &mu .log_err(); } -fn show_error_toast(summary: &str, detail: &str, plan: &CleanupPlan, cx: &mut AsyncApp) { - let target_workspace = plan - .current_workspace - .clone() - .or_else(|| plan.affected_workspaces.first().cloned()); - let Some(workspace) = target_workspace else { - return; - }; - - let _ = workspace.update(cx, |workspace, cx| { - struct ArchiveCleanupErrorToast; - let message = if detail.is_empty() { - summary.to_string() - } else { - format!("{summary}: {detail}") - }; - workspace.show_toast( - Toast::new( - NotificationId::unique::(), - message, - ) - .autohide(), - cx, - ); - }); -} - -fn all_open_workspaces(cx: &App) -> Vec> { +pub fn all_open_workspaces(cx: &App) -> Vec> { cx.windows() .into_iter() .filter_map(|window| window.downcast::()) @@ -1214,22 +723,6 @@ fn all_open_workspaces(cx: &App) -> Vec> { .collect() } -fn workspace_survives( - workspace: &Entity, - doomed_roots: &HashSet, - cx: &App, -) -> bool { - workspace - .read(cx) - .root_paths(cx) - .into_iter() - .any(|root| !doomed_roots.contains(root.as_ref())) -} - -fn workspace_path_list(workspace: &Entity, cx: &App) -> PathList { - PathList::new(&workspace.read(cx).root_paths(cx)) -} - fn window_for_workspace( workspace: &Entity, cx: &App, @@ -1261,12 +754,3 @@ fn current_app_state(cx: &mut AsyncApp) -> Option> { .map(|workspace| workspace.read(cx).app_state().clone()) }) } - -fn release_in_flight_roots(roots: &[RootPlan], cx: &mut AsyncApp) { - cx.update_global::(|coordinator, _cx| { - let mut in_flight_roots = coordinator.in_flight_roots.lock(); - for root in roots { - in_flight_roots.remove(&root.root_path); - } - }); -}