@@ -2381,3 +2381,106 @@ async fn test_following_while_deactivated(cx_a: &mut TestAppContext, cx_b: &mut
assert_eq!(editor.tab_content_text(0, cx), "2.js");
});
}
+
+#[gpui::test(iterations = 10)]
+async fn test_following_with_multibuffer_excerpts_at_unobserved_lamport(
+ cx_a: &mut TestAppContext,
+ cx_b: &mut TestAppContext,
+) {
+ let executor = cx_a.executor();
+ let mut server = TestServer::start(executor.clone()).await;
+ let client_a = server.create_client(cx_a, "user_a").await;
+ let client_b = server.create_client(cx_b, "user_b").await;
+ server
+ .create_room(&mut [(&client_a, cx_a), (&client_b, cx_b)])
+ .await;
+
+ cx_a.update(editor::init);
+ cx_b.update(editor::init);
+
+ let active_call_a = cx_a.read(ActiveCall::global);
+ let active_call_b = cx_b.read(ActiveCall::global);
+
+ client_a
+ .fs()
+ .insert_tree(path!("/a"), json!({ "1.txt": sample_text(20, 5, 'a') }))
+ .await;
+ let (project_a, worktree_id) = client_a.build_local_project(path!("/a"), cx_a).await;
+ active_call_a
+ .update(cx_a, |call, cx| call.set_location(Some(&project_a), cx))
+ .await
+ .unwrap();
+ let project_id = active_call_a
+ .update(cx_a, |call, cx| call.share_project(project_a.clone(), cx))
+ .await
+ .unwrap();
+ let project_b = client_b.join_remote_project(project_id, cx_b).await;
+ active_call_b
+ .update(cx_b, |call, cx| call.set_location(Some(&project_b), cx))
+ .await
+ .unwrap();
+
+ let (workspace_a, cx_a) = client_a.build_workspace(&project_a, cx_a);
+ let (workspace_b, cx_b) = client_b.build_workspace(&project_b, cx_b);
+
+ let buffer_a = project_a
+ .update(cx_a, |p, cx| {
+ p.open_buffer((worktree_id, rel_path("1.txt")), cx)
+ })
+ .await
+ .unwrap();
+ // B must already have the buffer open at a low Lamport so that A's
+ // subsequent edits create anchors B hasn't observed.
+ let _buffer_b = project_b
+ .update(cx_b, |p, cx| {
+ p.open_buffer((worktree_id, rel_path("1.txt")), cx)
+ })
+ .await
+ .unwrap();
+
+ workspace_b.update_in(cx_b, |workspace, window, cx| {
+ workspace.follow(client_a.peer_id().unwrap(), window, cx)
+ });
+ executor.run_until_parked();
+
+ buffer_a.update(cx_a, |buf, cx| {
+ for i in 0..30 {
+ let len = buf.len();
+ buf.edit([(len..len, format!("\nappended line {i}"))], None, cx);
+ }
+ });
+ let multibuffer_a = cx_a.new(|cx| {
+ let mut mb = MultiBuffer::new(Capability::ReadWrite);
+ let max_row = buffer_a.read(cx).max_point().row;
+ mb.set_excerpts_for_path(
+ PathKey::for_buffer(&buffer_a, cx),
+ buffer_a.clone(),
+ [Point::row_range(max_row.saturating_sub(5)..max_row)],
+ 1,
+ cx,
+ );
+ mb
+ });
+ workspace_a.update_in(cx_a, |workspace, window, cx| {
+ let editor = cx
+ .new(|cx| Editor::for_multibuffer(multibuffer_a, Some(project_a.clone()), window, cx));
+ workspace.add_item_to_active_pane(Box::new(editor), None, true, window, cx);
+ });
+
+ executor.run_until_parked();
+
+ let active_text = |workspace: &Entity<Workspace>, cx: &mut VisualTestContext| {
+ workspace.update(cx, |workspace, cx| {
+ workspace
+ .active_item(cx)
+ .unwrap()
+ .downcast::<Editor>()
+ .unwrap()
+ .update(cx, |editor, cx| editor.text(cx))
+ })
+ };
+ assert_eq!(
+ active_text(&workspace_a, cx_a),
+ active_text(&workspace_b, cx_b)
+ );
+}
@@ -101,6 +101,10 @@ impl FollowableItem for Editor {
.await
.debug_assert_ok("leaders don't share views for unshared buffers")?;
+ let path_excerpts =
+ deserialize_path_excerpts_and_wait_for_anchors(state.path_excerpts, &buffers, cx)
+ .await?;
+
let editor = cx.update(|window, cx| {
let multibuffer = cx.new(|cx| {
let mut multibuffer;
@@ -108,27 +112,13 @@ impl FollowableItem for Editor {
multibuffer = MultiBuffer::singleton(buffers.pop().unwrap(), cx)
} else {
multibuffer = MultiBuffer::new(project.read(cx).capability());
- for path_with_ranges in state.path_excerpts {
- let Some(path_key) =
- path_with_ranges.path_key.and_then(deserialize_path_key)
- else {
- continue;
- };
- let Some(buffer_id) = BufferId::new(path_with_ranges.buffer_id).ok()
- else {
- continue;
- };
+ for (path_key, buffer_id, ranges) in path_excerpts {
let Some(buffer) =
buffers.iter().find(|b| b.read(cx).remote_id() == buffer_id)
else {
continue;
};
let buffer_snapshot = buffer.read(cx).snapshot();
- let ranges = path_with_ranges
- .ranges
- .into_iter()
- .filter_map(deserialize_excerpt_range)
- .collect::<Vec<_>>();
multibuffer.update_path_excerpts(
path_key,
buffer.clone(),
@@ -402,25 +392,20 @@ async fn update_editor_from_message(
.map(|id| BufferId::new(id).map(|id| project.open_buffer_by_id(id, cx)))
.collect::<Result<Vec<_>>>()
})?;
- let _inserted_excerpt_buffers = try_join_all(inserted_excerpt_buffers).await?;
+ let inserted_excerpt_buffers = try_join_all(inserted_excerpt_buffers).await?;
+
+ let updated_paths = deserialize_path_excerpts_and_wait_for_anchors(
+ message.updated_paths,
+ &inserted_excerpt_buffers,
+ cx,
+ )
+ .await?;
// Update the editor's excerpts.
let buffer_snapshot = this.update(cx, |editor, cx| {
editor.buffer.update(cx, |multibuffer, cx| {
- for path_with_excerpts in message.updated_paths {
- let Some(path_key) = path_with_excerpts.path_key.and_then(deserialize_path_key)
- else {
- continue;
- };
- let ranges = path_with_excerpts
- .ranges
- .into_iter()
- .filter_map(deserialize_excerpt_range)
- .collect::<Vec<_>>();
- let Some(buffer) = BufferId::new(path_with_excerpts.buffer_id)
- .ok()
- .and_then(|buffer_id| project.read(cx).buffer_for_id(buffer_id, cx))
- else {
+ for (path_key, buffer_id, ranges) in updated_paths {
+ let Some(buffer) = project.read(cx).buffer_for_id(buffer_id, cx) else {
continue;
};
@@ -539,6 +524,56 @@ fn serialize_excerpt_range(range: ExcerptRange<language::Anchor>) -> proto::Exce
}
}
+async fn deserialize_path_excerpts_and_wait_for_anchors(
+ path_excerpts: Vec<proto::PathExcerpts>,
+ buffers: &[Entity<Buffer>],
+ cx: &mut AsyncWindowContext,
+) -> Result<Vec<(PathKey, BufferId, Vec<ExcerptRange<language::Anchor>>)>> {
+ let path_excerpts = path_excerpts
+ .into_iter()
+ .filter_map(|path_with_ranges| {
+ let path_key = path_with_ranges.path_key.and_then(deserialize_path_key)?;
+ let buffer_id = BufferId::new(path_with_ranges.buffer_id).ok()?;
+ let ranges = path_with_ranges
+ .ranges
+ .into_iter()
+ .filter_map(deserialize_excerpt_range)
+ .collect::<Vec<_>>();
+ Some((path_key, buffer_id, ranges))
+ })
+ .collect::<Vec<_>>();
+
+ let wait_for_anchors = cx.update(|_, cx| {
+ buffers
+ .iter()
+ .map(|buffer| {
+ let buffer_id = buffer.read(cx).remote_id();
+ let anchors = path_excerpts
+ .iter()
+ .filter(|(_, id, _)| *id == buffer_id)
+ .flat_map(|(_, _, ranges)| {
+ ranges.iter().flat_map(|range| {
+ [
+ range.context.start,
+ range.context.end,
+ range.primary.start,
+ range.primary.end,
+ ]
+ })
+ })
+ .collect::<Vec<_>>();
+ buffer.update(cx, |buffer, _| buffer.wait_for_anchors(anchors))
+ })
+ .collect::<Vec<_>>()
+ })?;
+ // Without this wait, resolving these anchors later can race ahead of the
+ // leader's pending buffer ops and trip `panic_bad_anchor` on a stale
+ // snapshot.
+ try_join_all(wait_for_anchors).await?;
+
+ Ok(path_excerpts)
+}
+
fn deserialize_excerpt_range(
excerpt_range: proto::ExcerptRange,
) -> Option<ExcerptRange<language::Anchor>> {