acp_thread: Clear running_turn when prompt task drops tx (#55562)

Bruno Moreira created

`AcpThread::status` is purely `running_turn.is_some()`. The cleanup that
takes `running_turn` sat below the early-return guard that fires when
the prompt response oneshot resolves to `Err(Cancelled)` (the inner
`send_task` was dropped before `tx.send`). Any code path that drops the
in-flight `send_task` therefore left the panel stuck in `Generating`.
Reordered so cleanup runs before the dropped-tx guard; the same-turn
invariant is preserved.

Related to #47928 (partial — that issue also has an upstream
`claude-agent-acp` component this PR does not address).

Self-Review Checklist:

- [x] I've reviewed my own diff for quality, security, and reliability
- [x] Unsafe blocks (if any) have justifying comments
- [x] The content is consistent with the [UI/UX
checklist](https://github.com/zed-industries/zed/blob/main/CONTRIBUTING.md#uiux-checklist)
- [x] Tests cover the new/changed behavior
- [x] Performance impact has been considered and is acceptable

Release Notes:

- Fixed agent panel staying in a generating state when the underlying
prompt task was cancelled before completing

Change summary

crates/acp_thread/src/acp_thread.rs | 72 ++++++++++++++++++++++++++++--
1 file changed, 67 insertions(+), 5 deletions(-)

Detailed changes

crates/acp_thread/src/acp_thread.rs 🔗

@@ -2294,10 +2294,6 @@ impl AcpThread {
                     this.project
                         .update(cx, |project, cx| project.set_agent_location(None, cx));
                 }
-                let Ok(response) = response else {
-                    // tx dropped, just return
-                    return Ok(None);
-                };
 
                 let is_same_turn = this
                     .running_turn
@@ -2306,11 +2302,18 @@ impl AcpThread {
 
                 // If the user submitted a follow up message, running_turn might
                 // already point to a different turn. Therefore we only want to
-                // take the task if it's the same turn.
+                // take the task if it's the same turn. We do this before the
+                // dropped-tx guard below so the panel exits its generating
+                // state even when the send_task is cancelled before tx.send().
                 if is_same_turn {
                     this.running_turn.take();
                 }
 
+                let Ok(response) = response else {
+                    // tx dropped, just return
+                    return Ok(None);
+                };
+
                 match response {
                     Ok(r) => {
                         Self::flush_streaming_text(&mut this.streaming_text_buffer, cx);
@@ -5517,4 +5520,63 @@ mod tests {
             );
         });
     }
+
+    /// Regression test: if the inner send_task is cancelled before it can
+    /// fire `tx.send(...)` (e.g. because the underlying future was dropped),
+    /// the outer task observes `rx.await` returning `Err(Cancelled)` and
+    /// must still clear `running_turn` so the panel transitions out of
+    /// `Generating`. Without this, the agent thread is wedged in the
+    /// loading state until Zed restarts.
+    #[gpui::test]
+    async fn test_running_turn_cleared_when_send_task_dropped(cx: &mut TestAppContext) {
+        init_test(cx);
+
+        let fs = FakeFs::new(cx.executor());
+        let project = Project::test(fs, [], cx).await;
+
+        // Handler hangs forever so the spawn at run_turn is parked inside
+        // `f(this, cx).await` with `tx` still alive but unsent.
+        let connection = Rc::new(FakeAgentConnection::new().on_user_message(
+            |_params, _thread, _cx| {
+                async move { futures::future::pending::<Result<acp::PromptResponse>>().await }
+                    .boxed_local()
+            },
+        ));
+
+        let thread = cx
+            .update(|cx| {
+                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
+            })
+            .await
+            .unwrap();
+
+        let request = thread.update(cx, |thread, cx| thread.send_raw("hello", cx));
+        cx.run_until_parked();
+
+        assert_eq!(
+            thread.read_with(cx, |t, _| t.status()),
+            ThreadStatus::Generating,
+            "thread should be generating while the handler is parked"
+        );
+
+        // Replace the in-flight send_task with a no-op. Dropping the original
+        // Task cancels its inner future, which drops `tx` without ever calling
+        // `tx.send(...)`. This mirrors the production scenario where the
+        // send_task future is cancelled before completion.
+        thread.update(cx, |thread, _| {
+            thread.running_turn.as_mut().unwrap().send_task = Task::ready(());
+        });
+
+        let result = request.await;
+        assert!(
+            matches!(result, Ok(None)),
+            "outer task should resolve to Ok(None) on dropped tx, got {result:?}"
+        );
+
+        assert_eq!(
+            thread.read_with(cx, |t, _| t.status()),
+            ThreadStatus::Idle,
+            "running_turn must be cleared even when tx was dropped without send"
+        );
+    }
 }