Detailed changes
@@ -235,7 +235,7 @@ impl ActionLog {
.await;
diff.update(cx, |diff, cx| {
- diff.set_snapshot(diff_snapshot, &buffer_snapshot, None, cx)
+ diff.set_snapshot(diff_snapshot, &buffer_snapshot, cx)
})?;
}
this.update(cx, |this, cx| {
@@ -1,13 +1,21 @@
use futures::channel::oneshot;
use git2::{DiffLineType as GitDiffLineType, DiffOptions as GitOptions, Patch as GitPatch};
-use gpui::{App, AppContext as _, AsyncApp, Context, Entity, EventEmitter, Task};
+use gpui::{App, AppContext as _, AsyncApp, Context, Entity, EventEmitter, Task, TaskLabel};
use language::{Language, LanguageRegistry};
use rope::Rope;
-use std::{cmp::Ordering, future::Future, iter, mem, ops::Range, sync::Arc};
+use std::{
+ cmp::Ordering,
+ future::Future,
+ iter,
+ ops::Range,
+ sync::{Arc, LazyLock},
+};
use sum_tree::SumTree;
use text::{Anchor, Bias, BufferId, OffsetRangeExt, Point, ToOffset as _};
use util::ResultExt;
+pub static CALCULATE_DIFF_TASK: LazyLock<TaskLabel> = LazyLock::new(TaskLabel::new);
+
pub struct BufferDiff {
pub buffer_id: BufferId,
inner: BufferDiffInner,
@@ -181,10 +189,12 @@ impl BufferDiffSnapshot {
base_text_exists = false;
};
- let hunks = cx.background_spawn({
- let buffer = buffer.clone();
- async move { compute_hunks(base_text_pair, buffer) }
- });
+ let hunks = cx
+ .background_executor()
+ .spawn_labeled(*CALCULATE_DIFF_TASK, {
+ let buffer = buffer.clone();
+ async move { compute_hunks(base_text_pair, buffer) }
+ });
async move {
let (base_text, hunks) = futures::join!(base_text_snapshot, hunks);
@@ -208,17 +218,18 @@ impl BufferDiffSnapshot {
) -> impl Future<Output = Self> + use<> {
let base_text_exists = base_text.is_some();
let base_text_pair = base_text.map(|text| (text, base_text_snapshot.as_rope().clone()));
- cx.background_spawn(async move {
- Self {
- inner: BufferDiffInner {
- base_text: base_text_snapshot,
- pending_hunks: SumTree::new(&buffer),
- hunks: compute_hunks(base_text_pair, buffer),
- base_text_exists,
- },
- secondary_diff: None,
- }
- })
+ cx.background_executor()
+ .spawn_labeled(*CALCULATE_DIFF_TASK, async move {
+ Self {
+ inner: BufferDiffInner {
+ base_text: base_text_snapshot,
+ pending_hunks: SumTree::new(&buffer),
+ hunks: compute_hunks(base_text_pair, buffer),
+ base_text_exists,
+ },
+ secondary_diff: None,
+ }
+ })
}
#[cfg(test)]
@@ -381,6 +392,7 @@ impl BufferDiffInner {
while let Some(PendingHunk {
buffer_range,
diff_base_byte_range,
+ new_status,
..
}) = pending_hunks_iter.next()
{
@@ -439,16 +451,23 @@ impl BufferDiffInner {
let index_end = prev_unstaged_hunk_base_text_end + end_overshoot;
let index_byte_range = index_start..index_end;
- let replacement_text = if stage {
- log::debug!("stage hunk {:?}", buffer_offset_range);
- buffer
- .text_for_range(buffer_offset_range)
- .collect::<String>()
- } else {
- log::debug!("unstage hunk {:?}", buffer_offset_range);
- head_text
- .chunks_in_range(diff_base_byte_range.clone())
- .collect::<String>()
+ let replacement_text = match new_status {
+ DiffHunkSecondaryStatus::SecondaryHunkRemovalPending => {
+ log::debug!("staging hunk {:?}", buffer_offset_range);
+ buffer
+ .text_for_range(buffer_offset_range)
+ .collect::<String>()
+ }
+ DiffHunkSecondaryStatus::SecondaryHunkAdditionPending => {
+ log::debug!("unstaging hunk {:?}", buffer_offset_range);
+ head_text
+ .chunks_in_range(diff_base_byte_range.clone())
+ .collect::<String>()
+ }
+ _ => {
+ debug_assert!(false);
+ continue;
+ }
};
edits.push((index_byte_range, replacement_text));
@@ -631,28 +650,6 @@ impl BufferDiffInner {
})
}
- fn set_state(
- &mut self,
- new_state: Self,
- buffer: &text::BufferSnapshot,
- ) -> Option<Range<Anchor>> {
- let (base_text_changed, changed_range) =
- match (self.base_text_exists, new_state.base_text_exists) {
- (false, false) => (true, None),
- (true, true) if self.base_text.remote_id() == new_state.base_text.remote_id() => {
- (false, new_state.compare(&self, buffer))
- }
- _ => (true, Some(text::Anchor::MIN..text::Anchor::MAX)),
- };
-
- let pending_hunks = mem::replace(&mut self.pending_hunks, SumTree::new(buffer));
- *self = new_state;
- if !base_text_changed {
- self.pending_hunks = pending_hunks;
- }
- changed_range
- }
-
fn compare(&self, old: &Self, new_snapshot: &text::BufferSnapshot) -> Option<Range<Anchor>> {
let mut new_cursor = self.hunks.cursor::<()>(new_snapshot);
let mut old_cursor = old.hunks.cursor::<()>(new_snapshot);
@@ -1011,26 +1008,61 @@ impl BufferDiff {
&mut self,
new_snapshot: BufferDiffSnapshot,
buffer: &text::BufferSnapshot,
- secondary_changed_range: Option<Range<Anchor>>,
cx: &mut Context<Self>,
) -> Option<Range<Anchor>> {
- let changed_range = self.inner.set_state(new_snapshot.inner, buffer);
-
- let changed_range = match (secondary_changed_range, changed_range) {
- (None, None) => None,
- (Some(unstaged_range), None) => self.range_to_hunk_range(unstaged_range, &buffer, cx),
- (None, Some(uncommitted_range)) => Some(uncommitted_range),
- (Some(unstaged_range), Some(uncommitted_range)) => {
- let mut start = uncommitted_range.start;
- let mut end = uncommitted_range.end;
- if let Some(unstaged_range) = self.range_to_hunk_range(unstaged_range, &buffer, cx)
- {
- start = unstaged_range.start.min(&uncommitted_range.start, &buffer);
- end = unstaged_range.end.max(&uncommitted_range.end, &buffer);
+ self.set_snapshot_with_secondary(new_snapshot, buffer, None, false, cx)
+ }
+
+ pub fn set_snapshot_with_secondary(
+ &mut self,
+ new_snapshot: BufferDiffSnapshot,
+ buffer: &text::BufferSnapshot,
+ secondary_diff_change: Option<Range<Anchor>>,
+ clear_pending_hunks: bool,
+ cx: &mut Context<Self>,
+ ) -> Option<Range<Anchor>> {
+ log::debug!("set snapshot with secondary {secondary_diff_change:?}");
+
+ let state = &mut self.inner;
+ let new_state = new_snapshot.inner;
+ let (base_text_changed, mut changed_range) =
+ match (state.base_text_exists, new_state.base_text_exists) {
+ (false, false) => (true, None),
+ (true, true) if state.base_text.remote_id() == new_state.base_text.remote_id() => {
+ (false, new_state.compare(&state, buffer))
+ }
+ _ => (true, Some(text::Anchor::MIN..text::Anchor::MAX)),
+ };
+
+ if let Some(secondary_changed_range) = secondary_diff_change {
+ if let Some(secondary_hunk_range) =
+ self.range_to_hunk_range(secondary_changed_range, &buffer, cx)
+ {
+ if let Some(range) = &mut changed_range {
+ range.start = secondary_hunk_range.start.min(&range.start, &buffer);
+ range.end = secondary_hunk_range.end.max(&range.end, &buffer);
+ } else {
+ changed_range = Some(secondary_hunk_range);
}
- Some(start..end)
}
- };
+ }
+
+ let state = &mut self.inner;
+ state.base_text_exists = new_state.base_text_exists;
+ state.base_text = new_state.base_text;
+ state.hunks = new_state.hunks;
+ if base_text_changed || clear_pending_hunks {
+ if let Some((first, last)) = state.pending_hunks.first().zip(state.pending_hunks.last())
+ {
+ if let Some(range) = &mut changed_range {
+ range.start = range.start.min(&first.buffer_range.start, &buffer);
+ range.end = range.end.max(&last.buffer_range.end, &buffer);
+ } else {
+ changed_range = Some(first.buffer_range.start..last.buffer_range.end);
+ }
+ }
+ state.pending_hunks = SumTree::new(buffer);
+ }
cx.emit(BufferDiffEvent::DiffChanged {
changed_range: changed_range.clone(),
@@ -1138,7 +1170,7 @@ impl BufferDiff {
return;
};
this.update(cx, |this, cx| {
- this.set_snapshot(snapshot, &buffer, None, cx);
+ this.set_snapshot(snapshot, &buffer, cx);
})
.log_err();
drop(complete_on_drop)
@@ -1163,7 +1195,7 @@ impl BufferDiff {
cx,
);
let snapshot = cx.background_executor().block(snapshot);
- self.set_snapshot(snapshot, &buffer, None, cx);
+ self.set_snapshot(snapshot, &buffer, cx);
}
}
@@ -1752,13 +1784,13 @@ mod tests {
let unstaged_diff = cx.new(|cx| {
let mut diff = BufferDiff::new(&buffer, cx);
- diff.set_snapshot(unstaged, &buffer, None, cx);
+ diff.set_snapshot(unstaged, &buffer, cx);
diff
});
let uncommitted_diff = cx.new(|cx| {
let mut diff = BufferDiff::new(&buffer, cx);
- diff.set_snapshot(uncommitted, &buffer, None, cx);
+ diff.set_snapshot(uncommitted, &buffer, cx);
diff.set_secondary_diff(unstaged_diff);
diff
});
@@ -1819,12 +1851,12 @@ mod tests {
let uncommitted = BufferDiffSnapshot::new_sync(buffer.clone(), head_text.clone(), cx);
let unstaged_diff = cx.new(|cx| {
let mut diff = BufferDiff::new(&buffer, cx);
- diff.set_snapshot(unstaged, &buffer, None, cx);
+ diff.set_snapshot(unstaged, &buffer, cx);
diff
});
let uncommitted_diff = cx.new(|cx| {
let mut diff = BufferDiff::new(&buffer, cx);
- diff.set_snapshot(uncommitted, &buffer, None, cx);
+ diff.set_snapshot(uncommitted, &buffer, cx);
diff.set_secondary_diff(unstaged_diff.clone());
diff
});
@@ -356,7 +356,7 @@ async fn build_buffer_diff(
cx.new(|cx| {
let mut diff = BufferDiff::new(&buffer.text, cx);
- diff.set_snapshot(diff_snapshot, &buffer.text, None, cx);
+ diff.set_snapshot(diff_snapshot, &buffer.text, cx);
diff
})
}
@@ -1501,7 +1501,6 @@ mod tests {
.unindent(),
);
- eprintln!(">>>>>>>> git restore");
let prev_buffer_hunks =
cx.update_window_entity(&buffer_editor, |buffer_editor, window, cx| {
let snapshot = buffer_editor.snapshot(window, cx);
@@ -1525,7 +1524,6 @@ mod tests {
});
assert_eq!(new_buffer_hunks.as_slice(), &[]);
- eprintln!(">>>>>>>> modify");
cx.update_window_entity(&buffer_editor, |buffer_editor, window, cx| {
buffer_editor.set_text("different\n", window, cx);
buffer_editor.save(false, project.clone(), window, cx)
@@ -1554,8 +1552,8 @@ mod tests {
cx,
&"
- original
- + หdifferent
- "
+ + different
+ ห"
.unindent(),
);
}
@@ -38,6 +38,7 @@ use language::{
proto::{deserialize_version, serialize_version},
};
use parking_lot::Mutex;
+use postage::stream::Stream as _;
use rpc::{
AnyProtoClient, TypedEnvelope,
proto::{self, FromProto, SSH_PROJECT_ID, ToProto, git_reset, split_repository_update},
@@ -83,15 +84,28 @@ struct SharedDiffs {
uncommitted: Option<Entity<BufferDiff>>,
}
-#[derive(Default)]
struct BufferDiffState {
unstaged_diff: Option<WeakEntity<BufferDiff>>,
uncommitted_diff: Option<WeakEntity<BufferDiff>>,
recalculate_diff_task: Option<Task<Result<()>>>,
language: Option<Arc<Language>>,
language_registry: Option<Arc<LanguageRegistry>>,
- diff_updated_futures: Vec<oneshot::Sender<()>>,
+ recalculating_tx: postage::watch::Sender<bool>,
+
+ /// These operation counts are used to ensure that head and index text
+ /// values read from the git repository are up-to-date with any hunk staging
+ /// operations that have been performed on the BufferDiff.
+ ///
+ /// The operation_count is incremented immediately when the user initiates a
+ /// hunk stage/unstage operation. Then, upon writing the new index text do
+ /// disk, the `operation_count_as_of_write` is updated to reflect the
+ /// operation_count that prompted the write. Finally, when reloading
+ /// index/head text from disk in response to a filesystem event, the
+ /// `operation_count_as_of_read` is updated to reflect the latest previous
+ /// write.
hunk_staging_operation_count: usize,
+ hunk_staging_operation_count_as_of_write: usize,
+ hunk_staging_operation_count_as_of_read: usize,
head_text: Option<Arc<String>>,
index_text: Option<Arc<String>>,
@@ -299,7 +313,7 @@ pub struct GitJob {
#[derive(PartialEq, Eq)]
enum GitJobKey {
WriteIndex(RepoPath),
- BatchReadIndex,
+ ReloadBufferDiffBases,
RefreshStatuses,
ReloadGitState,
}
@@ -551,7 +565,7 @@ impl GitStore {
diff_state.update(cx, |diff_state, _| diff_state.wait_for_recalculation())
{
return cx.background_executor().spawn(async move {
- task.await?;
+ task.await;
Ok(unstaged_diff)
});
}
@@ -608,7 +622,7 @@ impl GitStore {
diff_state.update(cx, |diff_state, _| diff_state.wait_for_recalculation())
{
return cx.background_executor().spawn(async move {
- task.await?;
+ task.await;
Ok(uncommitted_diff)
});
}
@@ -697,10 +711,13 @@ impl GitStore {
}
}
- let rx = diff_state.diff_bases_changed(text_snapshot, diff_bases_change, 0, cx);
+ diff_state.diff_bases_changed(text_snapshot, Some(diff_bases_change), cx);
+ let rx = diff_state.wait_for_recalculation();
anyhow::Ok(async move {
- rx.await.ok();
+ if let Some(rx) = rx {
+ rx.await;
+ }
Ok(diff)
})
})
@@ -1223,13 +1240,10 @@ impl GitStore {
for buffer in buffers {
if let Some(diff_state) = self.diffs.get_mut(&buffer.read(cx).remote_id()) {
let buffer = buffer.read(cx).text_snapshot();
- futures.push(diff_state.update(cx, |diff_state, cx| {
- diff_state.recalculate_diffs(
- buffer,
- diff_state.hunk_staging_operation_count,
- cx,
- )
- }));
+ diff_state.update(cx, |diff_state, cx| {
+ diff_state.recalculate_diffs(buffer, cx);
+ futures.extend(diff_state.wait_for_recalculation());
+ });
}
}
async move {
@@ -1246,31 +1260,33 @@ impl GitStore {
if let BufferDiffEvent::HunksStagedOrUnstaged(new_index_text) = event {
let buffer_id = diff.read(cx).buffer_id;
if let Some(diff_state) = self.diffs.get(&buffer_id) {
- diff_state.update(cx, |diff_state, _| {
+ let hunk_staging_operation_count = diff_state.update(cx, |diff_state, _| {
diff_state.hunk_staging_operation_count += 1;
+ diff_state.hunk_staging_operation_count
});
- }
- if let Some((repo, path)) = self.repository_and_path_for_buffer_id(buffer_id, cx) {
- let recv = repo.update(cx, |repo, cx| {
- log::debug!("updating index text for buffer {}", path.display());
- repo.spawn_set_index_text_job(
- path,
- new_index_text.as_ref().map(|rope| rope.to_string()),
- cx,
- )
- });
- let diff = diff.downgrade();
- cx.spawn(async move |this, cx| {
- if let Ok(Err(error)) = cx.background_spawn(recv).await {
- diff.update(cx, |diff, cx| {
- diff.clear_pending_hunks(cx);
- })
- .ok();
- this.update(cx, |_, cx| cx.emit(GitStoreEvent::IndexWriteError(error)))
+ if let Some((repo, path)) = self.repository_and_path_for_buffer_id(buffer_id, cx) {
+ let recv = repo.update(cx, |repo, cx| {
+ log::debug!("hunks changed for {}", path.display());
+ repo.spawn_set_index_text_job(
+ path,
+ new_index_text.as_ref().map(|rope| rope.to_string()),
+ hunk_staging_operation_count,
+ cx,
+ )
+ });
+ let diff = diff.downgrade();
+ cx.spawn(async move |this, cx| {
+ if let Ok(Err(error)) = cx.background_spawn(recv).await {
+ diff.update(cx, |diff, cx| {
+ diff.clear_pending_hunks(cx);
+ })
.ok();
- }
- })
- .detach();
+ this.update(cx, |_, cx| cx.emit(GitStoreEvent::IndexWriteError(error)))
+ .ok();
+ }
+ })
+ .detach();
+ }
}
}
}
@@ -1284,178 +1300,15 @@ impl GitStore {
log::debug!("local worktree repos changed");
debug_assert!(worktree.read(cx).is_local());
- let mut diff_state_updates = HashMap::<Entity<Repository>, Vec<_>>::default();
- for (buffer_id, diff_state) in &self.diffs {
- let Some(buffer) = self.buffer_store.read(cx).get(*buffer_id) else {
- continue;
- };
- let Some(file) = File::from_dyn(buffer.read(cx).file()) else {
- continue;
- };
- if file.worktree != worktree {
- continue;
- }
- let Some((repo, repo_path)) =
- self.repository_and_path_for_buffer_id(buffer.read(cx).remote_id(), cx)
- else {
- continue;
- };
- if !changed_repos.iter().any(|update| {
- update.old_work_directory_abs_path.as_ref()
- == Some(&repo.read(cx).work_directory_abs_path)
- || update.new_work_directory_abs_path.as_ref()
- == Some(&repo.read(cx).work_directory_abs_path)
- }) {
- continue;
- }
-
- let diff_state = diff_state.read(cx);
- let has_unstaged_diff = diff_state
- .unstaged_diff
- .as_ref()
- .is_some_and(|diff| diff.is_upgradable());
- let has_uncommitted_diff = diff_state
- .uncommitted_diff
- .as_ref()
- .is_some_and(|set| set.is_upgradable());
-
- let update = (
- buffer,
- repo_path,
- has_unstaged_diff.then(|| diff_state.index_text.clone()),
- has_uncommitted_diff.then(|| diff_state.head_text.clone()),
- diff_state.hunk_staging_operation_count,
- );
- diff_state_updates.entry(repo).or_default().push(update);
- }
-
- if diff_state_updates.is_empty() {
- return;
- }
-
- for (repo, repo_diff_state_updates) in diff_state_updates.into_iter() {
- let git_store = cx.weak_entity();
-
- let _ = repo.update(cx, |repo, _| {
- repo.send_keyed_job(
- Some(GitJobKey::BatchReadIndex),
- None,
- |state, mut cx| async move {
- let RepositoryState::Local { backend, .. } = state else {
- log::error!("tried to recompute diffs for a non-local repository");
- return;
- };
- let mut diff_bases_changes_by_buffer = Vec::new();
- for (
- buffer,
- repo_path,
- current_index_text,
- current_head_text,
- hunk_staging_operation_count,
- ) in &repo_diff_state_updates
- {
- let index_text = if current_index_text.is_some() {
- backend.load_index_text(repo_path.clone()).await
- } else {
- None
- };
- let head_text = if current_head_text.is_some() {
- backend.load_committed_text(repo_path.clone()).await
- } else {
- None
- };
-
- // Avoid triggering a diff update if the base text has not changed.
- if let Some((current_index, current_head)) =
- current_index_text.as_ref().zip(current_head_text.as_ref())
- {
- if current_index.as_deref() == index_text.as_ref()
- && current_head.as_deref() == head_text.as_ref()
- {
- continue;
- }
- }
-
- let diff_bases_change =
- match (current_index_text.is_some(), current_head_text.is_some()) {
- (true, true) => Some(if index_text == head_text {
- DiffBasesChange::SetBoth(head_text)
- } else {
- DiffBasesChange::SetEach {
- index: index_text,
- head: head_text,
- }
- }),
- (true, false) => Some(DiffBasesChange::SetIndex(index_text)),
- (false, true) => Some(DiffBasesChange::SetHead(head_text)),
- (false, false) => None,
- };
-
- diff_bases_changes_by_buffer.push((
- buffer,
- diff_bases_change,
- *hunk_staging_operation_count,
- ))
- }
-
- git_store
- .update(&mut cx, |git_store, cx| {
- for (buffer, diff_bases_change, hunk_staging_operation_count) in
- diff_bases_changes_by_buffer
- {
- let Some(diff_state) =
- git_store.diffs.get(&buffer.read(cx).remote_id())
- else {
- continue;
- };
- let Some(diff_bases_change) = diff_bases_change else {
- continue;
- };
-
- let downstream_client = git_store.downstream_client();
- diff_state.update(cx, |diff_state, cx| {
- use proto::update_diff_bases::Mode;
-
- let buffer = buffer.read(cx);
- if let Some((client, project_id)) = downstream_client {
- let (staged_text, committed_text, mode) =
- match diff_bases_change.clone() {
- DiffBasesChange::SetIndex(index) => {
- (index, None, Mode::IndexOnly)
- }
- DiffBasesChange::SetHead(head) => {
- (None, head, Mode::HeadOnly)
- }
- DiffBasesChange::SetEach { index, head } => {
- (index, head, Mode::IndexAndHead)
- }
- DiffBasesChange::SetBoth(text) => {
- (None, text, Mode::IndexMatchesHead)
- }
- };
- let message = proto::UpdateDiffBases {
- project_id: project_id.to_proto(),
- buffer_id: buffer.remote_id().to_proto(),
- staged_text,
- committed_text,
- mode: mode as i32,
- };
-
- client.send(message).log_err();
- }
-
- let _ = diff_state.diff_bases_changed(
- buffer.text_snapshot(),
- diff_bases_change,
- hunk_staging_operation_count,
- cx,
- );
- });
- }
- })
- .ok();
- },
- )
+ for repository in self.repositories.values() {
+ repository.update(cx, |repository, cx| {
+ let repo_abs_path = &repository.work_directory_abs_path;
+ if changed_repos.iter().any(|update| {
+ update.old_work_directory_abs_path.as_ref() == Some(&repo_abs_path)
+ || update.new_work_directory_abs_path.as_ref() == Some(&repo_abs_path)
+ }) {
+ repository.reload_buffer_diff_bases(cx);
+ }
});
}
}
@@ -1775,12 +1628,28 @@ impl GitStore {
) -> Result<proto::Ack> {
let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
+ let repo_path = RepoPath::from_str(&envelope.payload.path);
+
+ let hunk_staging_operation_count = this
+ .read_with(&cx, |this, cx| {
+ let project_path = repository_handle
+ .read(cx)
+ .repo_path_to_project_path(&repo_path, cx)?;
+ let buffer_id = this
+ .buffer_store
+ .read(cx)
+ .buffer_id_for_project_path(&project_path)?;
+ let diff_state = this.diffs.get(buffer_id)?;
+ Some(diff_state.read(cx).hunk_staging_operation_count)
+ })?
+ .ok_or_else(|| anyhow!("unknown buffer"))?;
repository_handle
.update(&mut cx, |repository_handle, cx| {
repository_handle.spawn_set_index_text_job(
- RepoPath::from_str(&envelope.payload.path),
+ repo_path,
envelope.payload.text,
+ hunk_staging_operation_count,
cx,
)
})?
@@ -2176,6 +2045,8 @@ impl GitStore {
if let Some(buffer) = this.buffer_store.read(cx).get(buffer_id) {
let buffer = buffer.read(cx).text_snapshot();
diff_state.update(cx, |diff_state, cx| {
+ diff_state.hunk_staging_operation_count_as_of_read =
+ diff_state.hunk_staging_operation_count_as_of_write;
diff_state.handle_base_texts_updated(buffer, request.payload, cx);
})
}
@@ -2258,11 +2129,7 @@ impl BufferDiffState {
fn buffer_language_changed(&mut self, buffer: Entity<Buffer>, cx: &mut Context<Self>) {
self.language = buffer.read(cx).language().cloned();
self.language_changed = true;
- let _ = self.recalculate_diffs(
- buffer.read(cx).text_snapshot(),
- self.hunk_staging_operation_count,
- cx,
- );
+ let _ = self.recalculate_diffs(buffer.read(cx).text_snapshot(), cx);
}
fn unstaged_diff(&self) -> Option<Entity<BufferDiff>> {
@@ -2295,46 +2162,47 @@ impl BufferDiffState {
},
};
- let _ = self.diff_bases_changed(
- buffer,
- diff_bases_change,
- self.hunk_staging_operation_count,
- cx,
- );
+ self.diff_bases_changed(buffer, Some(diff_bases_change), cx);
}
- pub fn wait_for_recalculation(&mut self) -> Option<oneshot::Receiver<()>> {
- if self.diff_updated_futures.is_empty() {
- return None;
+ pub fn wait_for_recalculation(&mut self) -> Option<impl Future<Output = ()> + use<>> {
+ if *self.recalculating_tx.borrow() {
+ let mut rx = self.recalculating_tx.subscribe();
+ return Some(async move {
+ loop {
+ let is_recalculating = rx.recv().await;
+ if is_recalculating != Some(true) {
+ break;
+ }
+ }
+ });
+ } else {
+ None
}
- let (tx, rx) = oneshot::channel();
- self.diff_updated_futures.push(tx);
- Some(rx)
}
fn diff_bases_changed(
&mut self,
buffer: text::BufferSnapshot,
- diff_bases_change: DiffBasesChange,
- prev_hunk_staging_operation_count: usize,
+ diff_bases_change: Option<DiffBasesChange>,
cx: &mut Context<Self>,
- ) -> oneshot::Receiver<()> {
+ ) {
match diff_bases_change {
- DiffBasesChange::SetIndex(index) => {
+ Some(DiffBasesChange::SetIndex(index)) => {
self.index_text = index.map(|mut index| {
text::LineEnding::normalize(&mut index);
Arc::new(index)
});
self.index_changed = true;
}
- DiffBasesChange::SetHead(head) => {
+ Some(DiffBasesChange::SetHead(head)) => {
self.head_text = head.map(|mut head| {
text::LineEnding::normalize(&mut head);
Arc::new(head)
});
self.head_changed = true;
}
- DiffBasesChange::SetBoth(text) => {
+ Some(DiffBasesChange::SetBoth(text)) => {
let text = text.map(|mut text| {
text::LineEnding::normalize(&mut text);
Arc::new(text)
@@ -2344,7 +2212,7 @@ impl BufferDiffState {
self.head_changed = true;
self.index_changed = true;
}
- DiffBasesChange::SetEach { index, head } => {
+ Some(DiffBasesChange::SetEach { index, head }) => {
self.index_text = index.map(|mut index| {
text::LineEnding::normalize(&mut index);
Arc::new(index)
@@ -2356,20 +2224,14 @@ impl BufferDiffState {
});
self.head_changed = true;
}
+ None => {}
}
- self.recalculate_diffs(buffer, prev_hunk_staging_operation_count, cx)
+ self.recalculate_diffs(buffer, cx)
}
- fn recalculate_diffs(
- &mut self,
- buffer: text::BufferSnapshot,
- prev_hunk_staging_operation_count: usize,
- cx: &mut Context<Self>,
- ) -> oneshot::Receiver<()> {
- log::debug!("recalculate diffs");
- let (tx, rx) = oneshot::channel();
- self.diff_updated_futures.push(tx);
+ fn recalculate_diffs(&mut self, buffer: text::BufferSnapshot, cx: &mut Context<Self>) {
+ *self.recalculating_tx.borrow_mut() = true;
let language = self.language.clone();
let language_registry = self.language_registry.clone();
@@ -2380,12 +2242,18 @@ impl BufferDiffState {
let index_changed = self.index_changed;
let head_changed = self.head_changed;
let language_changed = self.language_changed;
+ let prev_hunk_staging_operation_count = self.hunk_staging_operation_count_as_of_read;
let index_matches_head = match (self.index_text.as_ref(), self.head_text.as_ref()) {
(Some(index), Some(head)) => Arc::ptr_eq(index, head),
(None, None) => true,
_ => false,
};
self.recalculate_diff_task = Some(cx.spawn(async move |this, cx| {
+ log::debug!(
+ "start recalculating diffs for buffer {}",
+ buffer.remote_id()
+ );
+
let mut new_unstaged_diff = None;
if let Some(unstaged_diff) = &unstaged_diff {
new_unstaged_diff = Some(
@@ -2424,10 +2292,28 @@ impl BufferDiffState {
}
}
- if this.update(cx, |this, _| {
- this.hunk_staging_operation_count > prev_hunk_staging_operation_count
- })? {
- eprintln!("early return");
+ let cancel = this.update(cx, |this, _| {
+ // This checks whether all pending stage/unstage operations
+ // have quiesced (i.e. both the corresponding write and the
+ // read of that write have completed). If not, then we cancel
+ // this recalculation attempt to avoid invalidating pending
+ // state too quickly; another recalculation will come along
+ // later and clear the pending state once the state of the index has settled.
+ if this.hunk_staging_operation_count > prev_hunk_staging_operation_count {
+ *this.recalculating_tx.borrow_mut() = false;
+ true
+ } else {
+ false
+ }
+ })?;
+ if cancel {
+ log::debug!(
+ concat!(
+ "aborting recalculating diffs for buffer {}",
+ "due to subsequent hunk operations",
+ ),
+ buffer.remote_id()
+ );
return Ok(());
}
@@ -2438,7 +2324,7 @@ impl BufferDiffState {
if language_changed {
diff.language_changed(cx);
}
- diff.set_snapshot(new_unstaged_diff, &buffer, None, cx)
+ diff.set_snapshot(new_unstaged_diff, &buffer, cx)
})?
} else {
None
@@ -2451,25 +2337,53 @@ impl BufferDiffState {
if language_changed {
diff.language_changed(cx);
}
- diff.set_snapshot(new_uncommitted_diff, &buffer, unstaged_changed_range, cx);
+ diff.set_snapshot_with_secondary(
+ new_uncommitted_diff,
+ &buffer,
+ unstaged_changed_range,
+ true,
+ cx,
+ );
})?;
}
+ log::debug!(
+ "finished recalculating diffs for buffer {}",
+ buffer.remote_id()
+ );
+
if let Some(this) = this.upgrade() {
this.update(cx, |this, _| {
this.index_changed = false;
this.head_changed = false;
this.language_changed = false;
- for tx in this.diff_updated_futures.drain(..) {
- tx.send(()).ok();
- }
+ *this.recalculating_tx.borrow_mut() = false;
})?;
}
Ok(())
}));
+ }
+}
- rx
+impl Default for BufferDiffState {
+ fn default() -> Self {
+ Self {
+ unstaged_diff: Default::default(),
+ uncommitted_diff: Default::default(),
+ recalculate_diff_task: Default::default(),
+ language: Default::default(),
+ language_registry: Default::default(),
+ recalculating_tx: postage::watch::channel_with(false).0,
+ hunk_staging_operation_count: 0,
+ hunk_staging_operation_count_as_of_read: 0,
+ hunk_staging_operation_count_as_of_write: 0,
+ head_text: Default::default(),
+ index_text: Default::default(),
+ head_changed: Default::default(),
+ index_changed: Default::default(),
+ language_changed: Default::default(),
+ }
}
}
@@ -2702,6 +2616,170 @@ impl Repository {
self.git_store.upgrade()
}
+ fn reload_buffer_diff_bases(&mut self, cx: &mut Context<Self>) {
+ let this = cx.weak_entity();
+ let git_store = self.git_store.clone();
+ let _ = self.send_keyed_job(
+ Some(GitJobKey::ReloadBufferDiffBases),
+ None,
+ |state, mut cx| async move {
+ let RepositoryState::Local { backend, .. } = state else {
+ log::error!("tried to recompute diffs for a non-local repository");
+ return Ok(());
+ };
+
+ let Some(this) = this.upgrade() else {
+ return Ok(());
+ };
+
+ let repo_diff_state_updates = this.update(&mut cx, |this, cx| {
+ git_store.update(cx, |git_store, cx| {
+ git_store
+ .diffs
+ .iter()
+ .filter_map(|(buffer_id, diff_state)| {
+ let buffer_store = git_store.buffer_store.read(cx);
+ let buffer = buffer_store.get(*buffer_id)?;
+ let file = File::from_dyn(buffer.read(cx).file())?;
+ let abs_path =
+ file.worktree.read(cx).absolutize(&file.path).ok()?;
+ let repo_path = this.abs_path_to_repo_path(&abs_path)?;
+ log::debug!(
+ "start reload diff bases for repo path {}",
+ repo_path.0.display()
+ );
+ diff_state.update(cx, |diff_state, _| {
+ diff_state.hunk_staging_operation_count_as_of_read =
+ diff_state.hunk_staging_operation_count_as_of_write;
+
+ let has_unstaged_diff = diff_state
+ .unstaged_diff
+ .as_ref()
+ .is_some_and(|diff| diff.is_upgradable());
+ let has_uncommitted_diff = diff_state
+ .uncommitted_diff
+ .as_ref()
+ .is_some_and(|set| set.is_upgradable());
+
+ Some((
+ buffer,
+ repo_path,
+ has_unstaged_diff.then(|| diff_state.index_text.clone()),
+ has_uncommitted_diff.then(|| diff_state.head_text.clone()),
+ ))
+ })
+ })
+ .collect::<Vec<_>>()
+ })
+ })??;
+
+ let buffer_diff_base_changes = cx
+ .background_spawn(async move {
+ let mut changes = Vec::new();
+ for (buffer, repo_path, current_index_text, current_head_text) in
+ &repo_diff_state_updates
+ {
+ let index_text = if current_index_text.is_some() {
+ backend.load_index_text(repo_path.clone()).await
+ } else {
+ None
+ };
+ let head_text = if current_head_text.is_some() {
+ backend.load_committed_text(repo_path.clone()).await
+ } else {
+ None
+ };
+
+ let change =
+ match (current_index_text.as_ref(), current_head_text.as_ref()) {
+ (Some(current_index), Some(current_head)) => {
+ let index_changed =
+ index_text.as_ref() != current_index.as_deref();
+ let head_changed =
+ head_text.as_ref() != current_head.as_deref();
+ if index_changed && head_changed {
+ if index_text == head_text {
+ Some(DiffBasesChange::SetBoth(head_text))
+ } else {
+ Some(DiffBasesChange::SetEach {
+ index: index_text,
+ head: head_text,
+ })
+ }
+ } else if index_changed {
+ Some(DiffBasesChange::SetIndex(index_text))
+ } else if head_changed {
+ Some(DiffBasesChange::SetHead(head_text))
+ } else {
+ None
+ }
+ }
+ (Some(current_index), None) => {
+ let index_changed =
+ index_text.as_ref() != current_index.as_deref();
+ index_changed
+ .then_some(DiffBasesChange::SetIndex(index_text))
+ }
+ (None, Some(current_head)) => {
+ let head_changed =
+ head_text.as_ref() != current_head.as_deref();
+ head_changed.then_some(DiffBasesChange::SetHead(head_text))
+ }
+ (None, None) => None,
+ };
+
+ changes.push((buffer.clone(), change))
+ }
+ changes
+ })
+ .await;
+
+ git_store.update(&mut cx, |git_store, cx| {
+ for (buffer, diff_bases_change) in buffer_diff_base_changes {
+ let buffer_snapshot = buffer.read(cx).text_snapshot();
+ let buffer_id = buffer_snapshot.remote_id();
+ let Some(diff_state) = git_store.diffs.get(&buffer_id) else {
+ continue;
+ };
+
+ let downstream_client = git_store.downstream_client();
+ diff_state.update(cx, |diff_state, cx| {
+ use proto::update_diff_bases::Mode;
+
+ if let Some((diff_bases_change, (client, project_id))) =
+ diff_bases_change.clone().zip(downstream_client)
+ {
+ let (staged_text, committed_text, mode) = match diff_bases_change {
+ DiffBasesChange::SetIndex(index) => {
+ (index, None, Mode::IndexOnly)
+ }
+ DiffBasesChange::SetHead(head) => (None, head, Mode::HeadOnly),
+ DiffBasesChange::SetEach { index, head } => {
+ (index, head, Mode::IndexAndHead)
+ }
+ DiffBasesChange::SetBoth(text) => {
+ (None, text, Mode::IndexMatchesHead)
+ }
+ };
+ client
+ .send(proto::UpdateDiffBases {
+ project_id: project_id.to_proto(),
+ buffer_id: buffer_id.to_proto(),
+ staged_text,
+ committed_text,
+ mode: mode as i32,
+ })
+ .log_err();
+ }
+
+ diff_state.diff_bases_changed(buffer_snapshot, diff_bases_change, cx);
+ });
+ }
+ })
+ },
+ );
+ }
+
pub fn send_job<F, Fut, R>(
&mut self,
status: Option<SharedString>,
@@ -3416,14 +3494,17 @@ impl Repository {
&mut self,
path: RepoPath,
content: Option<String>,
- _cx: &mut App,
+ hunk_staging_operation_count: usize,
+ cx: &mut Context<Self>,
) -> oneshot::Receiver<anyhow::Result<()>> {
let id = self.id;
-
+ let this = cx.weak_entity();
+ let git_store = self.git_store.clone();
self.send_keyed_job(
Some(GitJobKey::WriteIndex(path.clone())),
None,
- move |git_repo, _cx| async move {
+ move |git_repo, mut cx| async move {
+ log::debug!("start updating index text for buffer {}", path.display());
match git_repo {
RepositoryState::Local {
backend,
@@ -3431,8 +3512,8 @@ impl Repository {
..
} => {
backend
- .set_index_text(path, content, environment.clone())
- .await
+ .set_index_text(path.clone(), content, environment.clone())
+ .await?;
}
RepositoryState::Remote { project_id, client } => {
client
@@ -3443,9 +3524,27 @@ impl Repository {
text: content,
})
.await?;
- Ok(())
}
}
+ log::debug!("finish updating index text for buffer {}", path.display());
+
+ let project_path = this
+ .read_with(&cx, |this, cx| this.repo_path_to_project_path(&path, cx))
+ .ok()
+ .flatten();
+ git_store.update(&mut cx, |git_store, cx| {
+ let buffer_id = git_store
+ .buffer_store
+ .read(cx)
+ .buffer_id_for_project_path(&project_path?)?;
+ let diff_state = git_store.diffs.get(buffer_id)?;
+ diff_state.update(cx, |diff_state, _| {
+ diff_state.hunk_staging_operation_count_as_of_write =
+ hunk_staging_operation_count;
+ });
+ Some(())
+ })?;
+ Ok(())
},
)
}
@@ -5,7 +5,8 @@ use crate::{
*,
};
use buffer_diff::{
- BufferDiffEvent, DiffHunkSecondaryStatus, DiffHunkStatus, DiffHunkStatusKind, assert_hunks,
+ BufferDiffEvent, CALCULATE_DIFF_TASK, DiffHunkSecondaryStatus, DiffHunkStatus,
+ DiffHunkStatusKind, assert_hunks,
};
use fs::FakeFs;
use futures::{StreamExt, future};
@@ -30,10 +31,11 @@ use parking_lot::Mutex;
use paths::{config_dir, tasks_file};
use postage::stream::Stream as _;
use pretty_assertions::{assert_eq, assert_matches};
+use rand::{Rng as _, rngs::StdRng};
use serde_json::json;
#[cfg(not(windows))]
use std::os;
-use std::{mem, num::NonZeroU32, ops::Range, str::FromStr, sync::OnceLock, task::Poll};
+use std::{env, mem, num::NonZeroU32, ops::Range, str::FromStr, sync::OnceLock, task::Poll};
use task::{ResolvedTask, TaskContext};
use unindent::Unindent as _;
use util::{
@@ -6605,7 +6607,7 @@ async fn test_staging_hunks(cx: &mut gpui::TestAppContext) {
} = event
{
let changed_range = changed_range.to_point(&snapshot);
- assert_eq!(changed_range, Point::new(0, 0)..Point::new(5, 0));
+ assert_eq!(changed_range, Point::new(0, 0)..Point::new(4, 0));
} else {
panic!("Unexpected event {event:?}");
}
@@ -6966,49 +6968,55 @@ async fn test_staging_hunks_with_delayed_fs_event(cx: &mut gpui::TestAppContext)
}
#[gpui::test]
-async fn test_staging_lots_of_hunks_fast(cx: &mut gpui::TestAppContext) {
+async fn test_staging_random_hunks(
+ mut rng: StdRng,
+ executor: BackgroundExecutor,
+ cx: &mut gpui::TestAppContext,
+) {
+ let operations = env::var("OPERATIONS")
+ .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
+ .unwrap_or(20);
+
+ // Try to induce races between diff recalculation and index writes.
+ if rng.gen_bool(0.5) {
+ executor.deprioritize(*CALCULATE_DIFF_TASK);
+ }
+
use DiffHunkSecondaryStatus::*;
init_test(cx);
- let different_lines = (0..500)
- .step_by(5)
- .map(|i| format!("diff {}\n", i))
- .collect::<Vec<String>>();
- let committed_contents = (0..500).map(|i| format!("{}\n", i)).collect::<String>();
- let file_contents = (0..500)
- .map(|i| {
- if i % 5 == 0 {
- different_lines[i / 5].clone()
- } else {
- format!("{}\n", i)
- }
+ let committed_text = (0..30).map(|i| format!("line {i}\n")).collect::<String>();
+ let index_text = committed_text.clone();
+ let buffer_text = (0..30)
+ .map(|i| match i % 5 {
+ 0 => format!("line {i} (modified)\n"),
+ _ => format!("line {i}\n"),
})
.collect::<String>();
let fs = FakeFs::new(cx.background_executor.clone());
fs.insert_tree(
- "/dir",
+ path!("/dir"),
json!({
".git": {},
- "file.txt": file_contents.clone()
+ "file.txt": buffer_text.clone()
}),
)
.await;
-
fs.set_head_for_repo(
- "/dir/.git".as_ref(),
- &[("file.txt".into(), committed_contents.clone())],
+ path!("/dir/.git").as_ref(),
+ &[("file.txt".into(), committed_text.clone())],
);
fs.set_index_for_repo(
- "/dir/.git".as_ref(),
- &[("file.txt".into(), committed_contents.clone())],
+ path!("/dir/.git").as_ref(),
+ &[("file.txt".into(), index_text.clone())],
);
+ let repo = fs.open_repo(path!("/dir/.git").as_ref()).unwrap();
- let project = Project::test(fs.clone(), ["/dir".as_ref()], cx).await;
-
+ let project = Project::test(fs.clone(), [path!("/dir").as_ref()], cx).await;
let buffer = project
.update(cx, |project, cx| {
- project.open_local_buffer("/dir/file.txt", cx)
+ project.open_local_buffer(path!("/dir/file.txt"), cx)
})
.await
.unwrap();
@@ -7020,94 +7028,60 @@ async fn test_staging_lots_of_hunks_fast(cx: &mut gpui::TestAppContext) {
.await
.unwrap();
- let mut expected_hunks: Vec<(Range<u32>, String, String, DiffHunkStatus)> = (0..500)
- .step_by(5)
- .map(|i| {
- (
- i as u32..i as u32 + 1,
- format!("{}\n", i),
- different_lines[i / 5].clone(),
- DiffHunkStatus::modified(HasSecondaryHunk),
- )
- })
- .collect();
+ let mut hunks =
+ uncommitted_diff.update(cx, |diff, cx| diff.hunks(&snapshot, cx).collect::<Vec<_>>());
+ assert_eq!(hunks.len(), 6);
- // The hunks are initially unstaged
- uncommitted_diff.read_with(cx, |diff, cx| {
- assert_hunks(
- diff.hunks(&snapshot, cx),
- &snapshot,
- &diff.base_text_string().unwrap(),
- &expected_hunks,
- );
- });
+ for _i in 0..operations {
+ let hunk_ix = rng.gen_range(0..hunks.len());
+ let hunk = &mut hunks[hunk_ix];
+ let row = hunk.range.start.row;
- for (_, _, _, status) in expected_hunks.iter_mut() {
- *status = DiffHunkStatus::modified(SecondaryHunkRemovalPending);
- }
-
- // Stage every hunk with a different call
- uncommitted_diff.update(cx, |diff, cx| {
- let hunks = diff.hunks(&snapshot, cx).collect::<Vec<_>>();
- for hunk in hunks {
- diff.stage_or_unstage_hunks(true, &[hunk], &snapshot, true, cx);
+ if hunk.status().has_secondary_hunk() {
+ log::info!("staging hunk at {row}");
+ uncommitted_diff.update(cx, |diff, cx| {
+ diff.stage_or_unstage_hunks(true, &[hunk.clone()], &snapshot, true, cx);
+ });
+ hunk.secondary_status = SecondaryHunkRemovalPending;
+ } else {
+ log::info!("unstaging hunk at {row}");
+ uncommitted_diff.update(cx, |diff, cx| {
+ diff.stage_or_unstage_hunks(false, &[hunk.clone()], &snapshot, true, cx);
+ });
+ hunk.secondary_status = SecondaryHunkAdditionPending;
}
- assert_hunks(
- diff.hunks(&snapshot, cx),
- &snapshot,
- &diff.base_text_string().unwrap(),
- &expected_hunks,
- );
- });
-
- // If we wait, we'll have no pending hunks
- cx.run_until_parked();
- for (_, _, _, status) in expected_hunks.iter_mut() {
- *status = DiffHunkStatus::modified(NoSecondaryHunk);
+ for _ in 0..rng.gen_range(0..10) {
+ log::info!("yielding");
+ cx.executor().simulate_random_delay().await;
+ }
}
- uncommitted_diff.update(cx, |diff, cx| {
- assert_hunks(
- diff.hunks(&snapshot, cx),
- &snapshot,
- &diff.base_text_string().unwrap(),
- &expected_hunks,
- );
- });
-
- for (_, _, _, status) in expected_hunks.iter_mut() {
- *status = DiffHunkStatus::modified(SecondaryHunkAdditionPending);
- }
+ cx.executor().run_until_parked();
- // Unstage every hunk with a different call
- uncommitted_diff.update(cx, |diff, cx| {
- let hunks = diff.hunks(&snapshot, cx).collect::<Vec<_>>();
- for hunk in hunks {
- diff.stage_or_unstage_hunks(false, &[hunk], &snapshot, true, cx);
+ for hunk in &mut hunks {
+ if hunk.secondary_status == SecondaryHunkRemovalPending {
+ hunk.secondary_status = NoSecondaryHunk;
+ } else if hunk.secondary_status == SecondaryHunkAdditionPending {
+ hunk.secondary_status = HasSecondaryHunk;
}
-
- assert_hunks(
- diff.hunks(&snapshot, cx),
- &snapshot,
- &diff.base_text_string().unwrap(),
- &expected_hunks,
- );
- });
-
- // If we wait, we'll have no pending hunks, again
- cx.run_until_parked();
- for (_, _, _, status) in expected_hunks.iter_mut() {
- *status = DiffHunkStatus::modified(HasSecondaryHunk);
}
+ log::info!(
+ "index text:\n{}",
+ repo.load_index_text("file.txt".into()).await.unwrap()
+ );
+
uncommitted_diff.update(cx, |diff, cx| {
- assert_hunks(
- diff.hunks(&snapshot, cx),
- &snapshot,
- &diff.base_text_string().unwrap(),
- &expected_hunks,
- );
+ let expected_hunks = hunks
+ .iter()
+ .map(|hunk| (hunk.range.start.row, hunk.secondary_status))
+ .collect::<Vec<_>>();
+ let actual_hunks = diff
+ .hunks(&snapshot, cx)
+ .map(|hunk| (hunk.range.start.row, hunk.secondary_status))
+ .collect::<Vec<_>>();
+ assert_eq!(actual_hunks, expected_hunks);
});
}