Fix race condition in `update_last_checkpoint` (#44801)

Nathan Sobo created

Release Notes:

- Fixed spurious "no checkpoint" error in agent panel

---

## Summary

`update_last_checkpoint` would call `last_user_message()` twice - once
at the start to capture the checkpoint, and again in an async closure
after the checkpoint comparison completed. If a new user message without
a checkpoint was added between these two calls, the second call would
find the new message and fail with "no checkpoint".

## Fix

Capture the user message ID at the start and use `user_message_mut(&id)`
in the async closure to find the specific message.

cc @mikayla-maki

Change summary

crates/acp_thread/src/acp_thread.rs | 115 ++++++++++++++++++++++++------
1 file changed, 91 insertions(+), 24 deletions(-)

Detailed changes

crates/acp_thread/src/acp_thread.rs 🔗

@@ -1993,37 +1993,42 @@ impl AcpThread {
     fn update_last_checkpoint(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
         let git_store = self.project.read(cx).git_store().clone();
 
-        let old_checkpoint = if let Some((_, message)) = self.last_user_message() {
-            if let Some(checkpoint) = message.checkpoint.as_ref() {
-                checkpoint.git_checkpoint.clone()
-            } else {
-                return Task::ready(Ok(()));
-            }
-        } else {
+        let Some((_, message)) = self.last_user_message() else {
+            return Task::ready(Ok(()));
+        };
+        let Some(user_message_id) = message.id.clone() else {
+            return Task::ready(Ok(()));
+        };
+        let Some(checkpoint) = message.checkpoint.as_ref() else {
             return Task::ready(Ok(()));
         };
+        let old_checkpoint = checkpoint.git_checkpoint.clone();
 
         let new_checkpoint = git_store.update(cx, |git, cx| git.checkpoint(cx));
         cx.spawn(async move |this, cx| {
-            let new_checkpoint = new_checkpoint
+            let Some(new_checkpoint) = new_checkpoint
                 .await
                 .context("failed to get new checkpoint")
-                .log_err();
-            if let Some(new_checkpoint) = new_checkpoint {
-                let equal = git_store
-                    .update(cx, |git, cx| {
-                        git.compare_checkpoints(old_checkpoint.clone(), new_checkpoint, cx)
-                    })?
-                    .await
-                    .unwrap_or(true);
-                this.update(cx, |this, cx| {
-                    let (ix, message) = this.last_user_message().context("no user message")?;
-                    let checkpoint = message.checkpoint.as_mut().context("no checkpoint")?;
-                    checkpoint.show = !equal;
-                    cx.emit(AcpThreadEvent::EntryUpdated(ix));
-                    anyhow::Ok(())
-                })??;
-            }
+                .log_err()
+            else {
+                return Ok(());
+            };
+
+            let equal = git_store
+                .update(cx, |git, cx| {
+                    git.compare_checkpoints(old_checkpoint.clone(), new_checkpoint, cx)
+                })?
+                .await
+                .unwrap_or(true);
+
+            this.update(cx, |this, cx| {
+                if let Some((ix, message)) = this.user_message_mut(&user_message_id) {
+                    if let Some(checkpoint) = message.checkpoint.as_mut() {
+                        checkpoint.show = !equal;
+                        cx.emit(AcpThreadEvent::EntryUpdated(ix));
+                    }
+                }
+            })?;
 
             Ok(())
         })
@@ -4069,4 +4074,66 @@ mod tests {
             "Should have exactly 2 terminals (the completed ones from before checkpoint)"
         );
     }
+
+    /// Tests that update_last_checkpoint correctly updates the original message's checkpoint
+    /// even when a new user message is added while the async checkpoint comparison is in progress.
+    ///
+    /// This is a regression test for a bug where update_last_checkpoint would fail with
+    /// "no checkpoint" if a new user message (without a checkpoint) was added between when
+    /// update_last_checkpoint started and when its async closure ran.
+    #[gpui::test]
+    async fn test_update_last_checkpoint_with_new_message_added(cx: &mut TestAppContext) {
+        init_test(cx);
+
+        let fs = FakeFs::new(cx.executor());
+        fs.insert_tree(path!("/test"), json!({".git": {}, "file.txt": "content"}))
+            .await;
+        let project = Project::test(fs.clone(), [Path::new(path!("/test"))], cx).await;
+
+        let handler_done = Arc::new(AtomicBool::new(false));
+        let handler_done_clone = handler_done.clone();
+        let connection = Rc::new(FakeAgentConnection::new().on_user_message(
+            move |_, _thread, _cx| {
+                handler_done_clone.store(true, SeqCst);
+                async move { Ok(acp::PromptResponse::new(acp::StopReason::EndTurn)) }.boxed_local()
+            },
+        ));
+
+        let thread = cx
+            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
+            .await
+            .unwrap();
+
+        let send_future = thread.update(cx, |thread, cx| thread.send_raw("First message", cx));
+        let send_task = cx.background_executor.spawn(send_future);
+
+        // Tick until handler completes, then a few more to let update_last_checkpoint start
+        while !handler_done.load(SeqCst) {
+            cx.executor().tick();
+        }
+        for _ in 0..5 {
+            cx.executor().tick();
+        }
+
+        thread.update(cx, |thread, cx| {
+            thread.push_entry(
+                AgentThreadEntry::UserMessage(UserMessage {
+                    id: Some(UserMessageId::new()),
+                    content: ContentBlock::Empty,
+                    chunks: vec!["Injected message (no checkpoint)".into()],
+                    checkpoint: None,
+                }),
+                cx,
+            );
+        });
+
+        cx.run_until_parked();
+        let result = send_task.await;
+
+        assert!(
+            result.is_ok(),
+            "send should succeed even when new message added during update_last_checkpoint: {:?}",
+            result.err()
+        );
+    }
 }