@@ -1,4 +1,5 @@
use std::{
+ future::Future,
path::{Path, PathBuf},
sync::Arc,
};
@@ -201,6 +202,7 @@ pub struct ThreadMetadataStore {
session_subscriptions: HashMap<acp::SessionId, Subscription>,
pending_thread_ops_tx: smol::channel::Sender<DbOperation>,
_db_operations_task: Task<()>,
+ in_flight_archives: HashMap<acp::SessionId, (Task<()>, smol::channel::Sender<()>)>,
}
#[derive(Debug, PartialEq)]
@@ -425,12 +427,34 @@ impl ThreadMetadataStore {
}
}
- pub fn archive(&mut self, session_id: &acp::SessionId, cx: &mut Context<Self>) {
+ pub fn archive<F, Fut>(
+ &mut self,
+ session_id: &acp::SessionId,
+ task_builder: Option<F>,
+ cx: &mut Context<Self>,
+ ) where
+ F: FnOnce(smol::channel::Receiver<()>) -> Fut,
+ Fut: Future<Output = ()> + 'static,
+ {
self.update_archived(session_id, true, cx);
+
+ if let Some(task_builder) = task_builder {
+ let (cancel_tx, cancel_rx) = smol::channel::bounded(1);
+ let future = task_builder(cancel_rx);
+ let task = cx.foreground_executor().spawn(future);
+ self.in_flight_archives
+ .insert(session_id.clone(), (task, cancel_tx));
+ }
}
pub fn unarchive(&mut self, session_id: &acp::SessionId, cx: &mut Context<Self>) {
self.update_archived(session_id, false, cx);
+ // Dropping the Sender triggers cancellation in the background task.
+ self.in_flight_archives.remove(session_id);
+ }
+
+ pub fn cleanup_completed_archive(&mut self, session_id: &acp::SessionId) {
+ self.in_flight_archives.remove(session_id);
}
pub fn complete_worktree_restore(
@@ -626,6 +650,7 @@ impl ThreadMetadataStore {
session_subscriptions: HashMap::default(),
pending_thread_ops_tx: tx,
_db_operations_task,
+ in_flight_archives: HashMap::default(),
};
let _ = this.reload(cx);
this
@@ -1882,7 +1907,11 @@ mod tests {
cx.update(|cx| {
let store = ThreadMetadataStore::global(cx);
store.update(cx, |store, cx| {
- store.archive(&acp::SessionId::new("session-1"), cx);
+ store.archive(
+ &acp::SessionId::new("session-1"),
+ None::<fn(smol::channel::Receiver<()>) -> std::future::Ready<()>>,
+ cx,
+ );
});
});
@@ -1959,7 +1988,11 @@ mod tests {
cx.update(|cx| {
let store = ThreadMetadataStore::global(cx);
store.update(cx, |store, cx| {
- store.archive(&acp::SessionId::new("session-2"), cx);
+ store.archive(
+ &acp::SessionId::new("session-2"),
+ None::<fn(smol::channel::Receiver<()>) -> std::future::Ready<()>>,
+ cx,
+ );
});
});
@@ -2059,7 +2092,11 @@ mod tests {
cx.update(|cx| {
let store = ThreadMetadataStore::global(cx);
store.update(cx, |store, cx| {
- store.archive(&acp::SessionId::new("session-1"), cx);
+ store.archive(
+ &acp::SessionId::new("session-1"),
+ None::<fn(smol::channel::Receiver<()>) -> std::future::Ready<()>>,
+ cx,
+ );
});
});
@@ -2107,7 +2144,11 @@ mod tests {
cx.update(|cx| {
let store = ThreadMetadataStore::global(cx);
store.update(cx, |store, cx| {
- store.archive(&acp::SessionId::new("nonexistent"), cx);
+ store.archive(
+ &acp::SessionId::new("nonexistent"),
+ None::<fn(smol::channel::Receiver<()>) -> std::future::Ready<()>>,
+ cx,
+ );
});
});
@@ -2136,7 +2177,11 @@ mod tests {
let store = ThreadMetadataStore::global(cx);
store.update(cx, |store, cx| {
store.save(metadata.clone(), cx);
- store.archive(&session_id, cx);
+ store.archive(
+ &session_id,
+ None::<fn(smol::channel::Receiver<()>) -> std::future::Ready<()>>,
+ cx,
+ );
});
});
@@ -1,223 +1,52 @@
use std::{
- collections::HashSet,
path::{Path, PathBuf},
sync::Arc,
};
use agent_client_protocol as acp;
use anyhow::{Context as _, Result, anyhow};
-use collections::HashMap;
use git::repository::{AskPassDelegate, CommitOptions, ResetMode};
-use gpui::{App, AsyncApp, Entity, Global, Task, WindowHandle};
-use parking_lot::Mutex;
+use gpui::{App, AsyncApp, Entity, Task, WindowHandle};
use project::{
LocalProjectFlags, Project, WorktreeId,
git_store::{Repository, resolve_git_worktree_to_main_repo},
};
use util::ResultExt;
-use workspace::{
- AppState, MultiWorkspace, OpenMode, OpenOptions, PathList, Toast, Workspace,
- notifications::NotificationId, open_new, open_paths,
-};
+use workspace::{AppState, MultiWorkspace, PathList, Workspace};
use crate::thread_metadata_store::{ArchivedGitWorktree, ThreadMetadataStore};
-#[derive(Default)]
-pub struct ThreadArchiveCleanupCoordinator {
- in_flight_roots: Mutex<HashSet<PathBuf>>,
-}
-
-impl Global for ThreadArchiveCleanupCoordinator {}
-
-fn ensure_global(cx: &mut App) {
- if !cx.has_global::<ThreadArchiveCleanupCoordinator>() {
- cx.set_global(ThreadArchiveCleanupCoordinator::default());
- }
-}
-
-#[derive(Clone)]
-pub struct ArchiveOutcome {
- pub archived_immediately: bool,
- pub roots_to_delete: Vec<PathBuf>,
-}
-
-#[derive(Clone)]
-struct RootPlan {
- root_path: PathBuf,
- main_repo_path: PathBuf,
- affected_projects: Vec<AffectedProject>,
- worktree_repo: Option<Entity<Repository>>,
- branch_name: Option<String>,
-}
-
-#[derive(Clone)]
-struct AffectedProject {
- project: Entity<Project>,
- worktree_id: WorktreeId,
-}
-
#[derive(Clone)]
-enum FallbackTarget {
- ExistingWorkspace {
- window: WindowHandle<MultiWorkspace>,
- workspace: Entity<Workspace>,
- },
- OpenPaths {
- requesting_window: WindowHandle<MultiWorkspace>,
- paths: Vec<PathBuf>,
- },
- OpenEmpty {
- requesting_window: WindowHandle<MultiWorkspace>,
- },
+pub struct RootPlan {
+ pub root_path: PathBuf,
+ pub main_repo_path: PathBuf,
+ pub affected_projects: Vec<AffectedProject>,
+ pub worktree_repo: Option<Entity<Repository>>,
+ pub branch_name: Option<String>,
}
#[derive(Clone)]
-struct CleanupPlan {
- folder_paths: PathList,
- roots: Vec<RootPlan>,
- current_workspace: Option<Entity<Workspace>>,
- current_workspace_will_be_empty: bool,
- fallback: Option<FallbackTarget>,
- affected_workspaces: Vec<Entity<Workspace>>,
+pub struct AffectedProject {
+ pub project: Entity<Project>,
+ pub worktree_id: WorktreeId,
}
fn archived_worktree_ref_name(id: i64) -> String {
format!("refs/archived-worktrees/{}", id)
}
-struct PersistOutcome {
- archived_worktree_id: i64,
- staged_commit_hash: String,
-}
-
-pub fn archive_thread(
- session_id: &acp::SessionId,
- current_workspace: Option<Entity<Workspace>>,
- window: Option<WindowHandle<MultiWorkspace>>,
- cx: &mut App,
-) -> ArchiveOutcome {
- ensure_global(cx);
- let plan =
- window.and_then(|window| build_cleanup_plan(session_id, current_workspace, window, cx));
-
- ThreadMetadataStore::global(cx).update(cx, |store, cx| store.archive(session_id, cx));
-
- if let Some(plan) = plan {
- let roots_to_delete = plan
- .roots
- .iter()
- .map(|root| root.root_path.clone())
- .collect::<Vec<_>>();
- if !roots_to_delete.is_empty() {
- cx.spawn(async move |cx| {
- run_cleanup(plan, cx).await;
- })
- .detach();
-
- return ArchiveOutcome {
- archived_immediately: true,
- roots_to_delete,
- };
- }
- }
-
- ArchiveOutcome {
- archived_immediately: true,
- roots_to_delete: Vec::new(),
- }
+pub struct PersistOutcome {
+ pub archived_worktree_id: i64,
+ pub staged_commit_hash: String,
}
-fn build_cleanup_plan(
- session_id: &acp::SessionId,
- current_workspace: Option<Entity<Workspace>>,
- requesting_window: WindowHandle<MultiWorkspace>,
+pub fn build_root_plan(
+ path: &Path,
+ workspaces: &[Entity<Workspace>],
cx: &App,
-) -> Option<CleanupPlan> {
- let metadata = ThreadMetadataStore::global(cx)
- .read(cx)
- .entry(session_id)
- .cloned()?;
-
- let workspaces = all_open_workspaces(cx);
-
- let candidate_roots = metadata
- .folder_paths
- .ordered_paths()
- .filter_map(|path| build_root_plan(path, &workspaces, cx))
- .filter(|plan| {
- !path_is_referenced_by_other_unarchived_threads(session_id, &plan.root_path, cx)
- })
- .collect::<Vec<_>>();
-
- if candidate_roots.is_empty() {
- return Some(CleanupPlan {
- folder_paths: metadata.folder_paths,
- roots: Vec::new(),
- current_workspace,
- current_workspace_will_be_empty: false,
- fallback: None,
- affected_workspaces: Vec::new(),
- });
- }
-
- let mut affected_workspaces = Vec::new();
- let mut current_workspace_will_be_empty = false;
-
- for workspace in workspaces.iter() {
- let doomed_root_count = workspace
- .read(cx)
- .root_paths(cx)
- .into_iter()
- .filter(|path| {
- candidate_roots
- .iter()
- .any(|root| root.root_path.as_path() == path.as_ref())
- })
- .count();
-
- if doomed_root_count == 0 {
- continue;
- }
-
- let surviving_root_count = workspace
- .read(cx)
- .root_paths(cx)
- .len()
- .saturating_sub(doomed_root_count);
- if current_workspace
- .as_ref()
- .is_some_and(|current| current == workspace)
- {
- current_workspace_will_be_empty = surviving_root_count == 0;
- }
- affected_workspaces.push(workspace.clone());
- }
-
- let fallback = if current_workspace_will_be_empty {
- choose_fallback_target(
- session_id,
- current_workspace.as_ref(),
- &candidate_roots,
- &requesting_window,
- &workspaces,
- cx,
- )
- } else {
- None
- };
-
- Some(CleanupPlan {
- folder_paths: metadata.folder_paths,
- roots: candidate_roots,
- current_workspace,
- current_workspace_will_be_empty,
- fallback,
- affected_workspaces,
- })
-}
-
-fn build_root_plan(path: &Path, workspaces: &[Entity<Workspace>], cx: &App) -> Option<RootPlan> {
+) -> Option<RootPlan> {
let path = path.to_path_buf();
+
let affected_projects = workspaces
.iter()
.filter_map(|workspace| {
@@ -267,7 +96,7 @@ fn build_root_plan(path: &Path, workspaces: &[Entity<Workspace>], cx: &App) -> O
})
}
-fn path_is_referenced_by_other_unarchived_threads(
+pub fn path_is_referenced_by_other_unarchived_threads(
current_session_id: &acp::SessionId,
path: &Path,
cx: &App,
@@ -286,272 +115,7 @@ fn path_is_referenced_by_other_unarchived_threads(
})
}
-fn choose_fallback_target(
- current_session_id: &acp::SessionId,
- current_workspace: Option<&Entity<Workspace>>,
- roots: &[RootPlan],
- requesting_window: &WindowHandle<MultiWorkspace>,
- workspaces: &[Entity<Workspace>],
- cx: &App,
-) -> Option<FallbackTarget> {
- let doomed_roots = roots
- .iter()
- .map(|root| root.root_path.clone())
- .collect::<HashSet<_>>();
-
- let surviving_same_window = requesting_window.read(cx).ok().and_then(|multi_workspace| {
- multi_workspace
- .workspaces()
- .iter()
- .filter(|workspace| current_workspace.is_none_or(|current| *workspace != current))
- .find(|workspace| workspace_survives(workspace, &doomed_roots, cx))
- .cloned()
- });
- if let Some(workspace) = surviving_same_window {
- return Some(FallbackTarget::ExistingWorkspace {
- window: *requesting_window,
- workspace,
- });
- }
-
- for window in cx
- .windows()
- .into_iter()
- .filter_map(|window| window.downcast::<MultiWorkspace>())
- {
- if window == *requesting_window {
- continue;
- }
- if let Ok(multi_workspace) = window.read(cx) {
- if let Some(workspace) = multi_workspace
- .workspaces()
- .iter()
- .find(|workspace| workspace_survives(workspace, &doomed_roots, cx))
- .cloned()
- {
- return Some(FallbackTarget::ExistingWorkspace { window, workspace });
- }
- }
- }
-
- let safe_thread_workspace = ThreadMetadataStore::global(cx)
- .read(cx)
- .entries()
- .filter(|metadata| metadata.session_id != *current_session_id && !metadata.archived)
- .filter_map(|metadata| {
- workspaces
- .iter()
- .find(|workspace| workspace_path_list(workspace, cx) == metadata.folder_paths)
- .cloned()
- })
- .find(|workspace| workspace_survives(workspace, &doomed_roots, cx));
-
- if let Some(workspace) = safe_thread_workspace {
- let window = window_for_workspace(&workspace, cx).unwrap_or(*requesting_window);
- return Some(FallbackTarget::ExistingWorkspace { window, workspace });
- }
-
- if let Some(root) = roots.first() {
- return Some(FallbackTarget::OpenPaths {
- requesting_window: *requesting_window,
- paths: vec![root.main_repo_path.clone()],
- });
- }
-
- Some(FallbackTarget::OpenEmpty {
- requesting_window: *requesting_window,
- })
-}
-
-async fn run_cleanup(plan: CleanupPlan, cx: &mut AsyncApp) {
- let roots_to_delete =
- cx.update_global::<ThreadArchiveCleanupCoordinator, _>(|coordinator, _cx| {
- let mut in_flight_roots = coordinator.in_flight_roots.lock();
- plan.roots
- .iter()
- .filter_map(|root| {
- if in_flight_roots.insert(root.root_path.clone()) {
- Some(root.clone())
- } else {
- None
- }
- })
- .collect::<Vec<_>>()
- });
-
- if roots_to_delete.is_empty() {
- return;
- }
-
- let active_workspace = plan.current_workspace.clone();
- if let Some(workspace) = active_workspace
- .as_ref()
- .filter(|_| plan.current_workspace_will_be_empty)
- {
- let Some(window) = window_for_workspace_async(workspace, cx) else {
- release_in_flight_roots(&roots_to_delete, cx);
- return;
- };
-
- let should_continue = save_workspace_for_root_removal(workspace.clone(), window, cx).await;
- if !should_continue {
- release_in_flight_roots(&roots_to_delete, cx);
- return;
- }
- }
-
- for workspace in plan
- .affected_workspaces
- .iter()
- .filter(|workspace| Some((*workspace).clone()) != active_workspace)
- {
- let Some(window) = window_for_workspace_async(workspace, cx) else {
- continue;
- };
-
- if !save_workspace_for_root_removal(workspace.clone(), window, cx).await {
- release_in_flight_roots(&roots_to_delete, cx);
- return;
- }
- }
-
- if plan.current_workspace_will_be_empty {
- if let Some(fallback) = plan.fallback.clone() {
- activate_fallback(fallback, cx).await.log_err();
- }
- }
-
- let mut git_removal_errors: Vec<(PathBuf, anyhow::Error)> = Vec::new();
- let mut persist_errors: Vec<(PathBuf, anyhow::Error)> = Vec::new();
- let mut persist_outcomes: HashMap<PathBuf, PersistOutcome> = HashMap::default();
-
- for root in &roots_to_delete {
- if root.worktree_repo.is_some() {
- match persist_worktree_state(root, &plan, cx).await {
- Ok(outcome) => {
- persist_outcomes.insert(root.root_path.clone(), outcome);
- }
- Err(error) => {
- log::error!(
- "Failed to persist worktree state for {}: {error}",
- root.root_path.display()
- );
- persist_errors.push((root.root_path.clone(), error));
- continue;
- }
- }
- }
-
- if let Err(error) = remove_root(root.clone(), cx).await {
- if let Some(outcome) = persist_outcomes.remove(&root.root_path) {
- rollback_persist(&outcome, root, cx).await;
- }
- git_removal_errors.push((root.root_path.clone(), error));
- }
- }
-
- cleanup_empty_workspaces(&plan.affected_workspaces, cx).await;
-
- let all_errors: Vec<(PathBuf, anyhow::Error)> = persist_errors
- .into_iter()
- .chain(git_removal_errors)
- .collect();
-
- if !all_errors.is_empty() {
- let detail = all_errors
- .into_iter()
- .map(|(path, error)| format!("{}: {error}", path.display()))
- .collect::<Vec<_>>()
- .join("\n");
- show_error_toast(
- "Thread archived, but linked worktree cleanup failed",
- &detail,
- &plan,
- cx,
- );
- }
-
- release_in_flight_roots(&roots_to_delete, cx);
-}
-
-async fn save_workspace_for_root_removal(
- workspace: Entity<Workspace>,
- window: WindowHandle<MultiWorkspace>,
- cx: &mut AsyncApp,
-) -> bool {
- let has_dirty_items = workspace.read_with(cx, |workspace, cx| {
- workspace.items(cx).any(|item| item.is_dirty(cx))
- });
-
- if has_dirty_items {
- let _ = window.update(cx, |multi_workspace, window, cx| {
- window.activate_window();
- multi_workspace.activate(workspace.clone(), window, cx);
- });
- }
-
- let save_task = window.update(cx, |_multi_workspace, window, cx| {
- workspace.update(cx, |workspace, cx| {
- workspace.prompt_to_save_or_discard_dirty_items(window, cx)
- })
- });
-
- let Ok(task) = save_task else {
- return false;
- };
-
- task.await.unwrap_or(false)
-}
-
-async fn activate_fallback(target: FallbackTarget, cx: &mut AsyncApp) -> Result<()> {
- match target {
- FallbackTarget::ExistingWorkspace { window, workspace } => {
- window.update(cx, |multi_workspace, window, cx| {
- window.activate_window();
- multi_workspace.activate(workspace, window, cx);
- })?;
- }
- FallbackTarget::OpenPaths {
- requesting_window,
- paths,
- } => {
- let app_state = current_app_state(cx).context("no workspace app state available")?;
- cx.update(|cx| {
- open_paths(
- &paths,
- app_state,
- OpenOptions {
- requesting_window: Some(requesting_window),
- open_mode: OpenMode::Activate,
- ..Default::default()
- },
- cx,
- )
- })
- .await?;
- }
- FallbackTarget::OpenEmpty { requesting_window } => {
- let app_state = current_app_state(cx).context("no workspace app state available")?;
- cx.update(|cx| {
- open_new(
- OpenOptions {
- requesting_window: Some(requesting_window),
- open_mode: OpenMode::Activate,
- ..Default::default()
- },
- app_state,
- cx,
- |_workspace, _window, _cx| {},
- )
- })
- .await?;
- }
- }
-
- Ok(())
-}
-
-async fn remove_root(root: RootPlan, cx: &mut AsyncApp) -> Result<()> {
+pub async fn remove_root(root: RootPlan, cx: &mut AsyncApp) -> Result<()> {
let release_tasks: Vec<_> = root
.affected_projects
.iter()
@@ -687,9 +251,9 @@ async fn rollback_root(root: &RootPlan, cx: &mut AsyncApp) {
}
}
-async fn persist_worktree_state(
+pub async fn persist_worktree_state(
root: &RootPlan,
- plan: &CleanupPlan,
+ folder_paths: &PathList,
cx: &mut AsyncApp,
) -> Result<PersistOutcome> {
let worktree_repo = root
@@ -833,7 +397,7 @@ async fn persist_worktree_state(
// Link all threads on this worktree to the archived record
let session_ids: Vec<acp::SessionId> = store.read_with(cx, |store, _cx| {
store
- .all_session_ids_for_path(&plan.folder_paths)
+ .all_session_ids_for_path(folder_paths)
.cloned()
.collect()
});
@@ -900,7 +464,7 @@ async fn persist_worktree_state(
})
}
-async fn rollback_persist(outcome: &PersistOutcome, root: &RootPlan, cx: &mut AsyncApp) {
+pub async fn rollback_persist(outcome: &PersistOutcome, root: &RootPlan, cx: &mut AsyncApp) {
// Undo WIP commits on the worktree repo
if let Some(worktree_repo) = &root.worktree_repo {
let rx = worktree_repo.update(cx, |repo, cx| {
@@ -934,34 +498,6 @@ async fn rollback_persist(outcome: &PersistOutcome, root: &RootPlan, cx: &mut As
}
}
-async fn cleanup_empty_workspaces(workspaces: &[Entity<Workspace>], cx: &mut AsyncApp) {
- for workspace in workspaces {
- let is_empty = match workspace
- .downgrade()
- .read_with(cx, |workspace, cx| workspace.root_paths(cx).is_empty())
- {
- Ok(is_empty) => is_empty,
- Err(_) => {
- log::debug!("Workspace entity already dropped during cleanup; skipping");
- continue;
- }
- };
- if !is_empty {
- continue;
- }
-
- let Some(window) = window_for_workspace_async(workspace, cx) else {
- continue;
- };
-
- let _ = window.update(cx, |multi_workspace, window, cx| {
- if !multi_workspace.remove(workspace, window, cx) {
- window.remove_window();
- }
- });
- }
-}
-
pub async fn restore_worktree_via_git(
row: &ArchivedGitWorktree,
cx: &mut AsyncApp,
@@ -1174,34 +710,7 @@ pub async fn cleanup_archived_worktree_record(row: &ArchivedGitWorktree, cx: &mu
.log_err();
}
-fn show_error_toast(summary: &str, detail: &str, plan: &CleanupPlan, cx: &mut AsyncApp) {
- let target_workspace = plan
- .current_workspace
- .clone()
- .or_else(|| plan.affected_workspaces.first().cloned());
- let Some(workspace) = target_workspace else {
- return;
- };
-
- let _ = workspace.update(cx, |workspace, cx| {
- struct ArchiveCleanupErrorToast;
- let message = if detail.is_empty() {
- summary.to_string()
- } else {
- format!("{summary}: {detail}")
- };
- workspace.show_toast(
- Toast::new(
- NotificationId::unique::<ArchiveCleanupErrorToast>(),
- message,
- )
- .autohide(),
- cx,
- );
- });
-}
-
-fn all_open_workspaces(cx: &App) -> Vec<Entity<Workspace>> {
+pub fn all_open_workspaces(cx: &App) -> Vec<Entity<Workspace>> {
cx.windows()
.into_iter()
.filter_map(|window| window.downcast::<MultiWorkspace>())
@@ -1214,22 +723,6 @@ fn all_open_workspaces(cx: &App) -> Vec<Entity<Workspace>> {
.collect()
}
-fn workspace_survives(
- workspace: &Entity<Workspace>,
- doomed_roots: &HashSet<PathBuf>,
- cx: &App,
-) -> bool {
- workspace
- .read(cx)
- .root_paths(cx)
- .into_iter()
- .any(|root| !doomed_roots.contains(root.as_ref()))
-}
-
-fn workspace_path_list(workspace: &Entity<Workspace>, cx: &App) -> PathList {
- PathList::new(&workspace.read(cx).root_paths(cx))
-}
-
fn window_for_workspace(
workspace: &Entity<Workspace>,
cx: &App,
@@ -1261,12 +754,3 @@ fn current_app_state(cx: &mut AsyncApp) -> Option<Arc<AppState>> {
.map(|workspace| workspace.read(cx).app_state().clone())
})
}
-
-fn release_in_flight_roots(roots: &[RootPlan], cx: &mut AsyncApp) {
- cx.update_global::<ThreadArchiveCleanupCoordinator, _>(|coordinator, _cx| {
- let mut in_flight_roots = coordinator.in_flight_roots.lock();
- for root in roots {
- in_flight_roots.remove(&root.root_path);
- }
- });
-}