language_models: Activate StreamEnded support in completion streams for Cloud provider (#49337)

Tom Houlé created

Now that the cloud-side StreamEnded support has landed, enable it on the
Zed client:

- Send the
`x-zed-client-supports-stream-ended-request-completion-status` header so
cloud knows to send `StreamEnded` status events.
- Track whether `StreamEnded` was received during the stream.
- Emit `StreamEndedUnexpectedly` error when the stream closes without a
`StreamEnded` status, indicating the response was truncated.

Concretely, this means that the agent panel will now display an
abnormally terminated response stream for model calls through a Zed
subscription as an error instead of just stopping.

Release Notes:

- The agent panel now displays an abnormally terminated response stream
for model calls through a Zed subscription as an error instead of just
stopping.

Change summary

crates/language_models/src/provider/cloud.rs | 34 +++++++++------------
1 file changed, 14 insertions(+), 20 deletions(-)

Detailed changes

crates/language_models/src/provider/cloud.rs 🔗

@@ -5,9 +5,10 @@ use chrono::{DateTime, Utc};
 use client::{Client, UserStore, zed_urls};
 use cloud_api_types::Plan;
 use cloud_llm_client::{
-    CLIENT_SUPPORTS_STATUS_MESSAGES_HEADER_NAME, CLIENT_SUPPORTS_X_AI_HEADER_NAME, CompletionBody,
-    CompletionEvent, CompletionRequestStatus, CountTokensBody, CountTokensResponse,
-    ListModelsResponse, SERVER_SUPPORTS_STATUS_MESSAGES_HEADER_NAME, ZED_VERSION_HEADER_NAME,
+    CLIENT_SUPPORTS_STATUS_MESSAGES_HEADER_NAME, CLIENT_SUPPORTS_STATUS_STREAM_ENDED_HEADER_NAME,
+    CLIENT_SUPPORTS_X_AI_HEADER_NAME, CompletionBody, CompletionEvent, CompletionRequestStatus,
+    CountTokensBody, CountTokensResponse, ListModelsResponse,
+    SERVER_SUPPORTS_STATUS_MESSAGES_HEADER_NAME, ZED_VERSION_HEADER_NAME,
 };
 use futures::{
     AsyncBufReadExt, FutureExt, Stream, StreamExt,
@@ -397,8 +398,7 @@ impl CloudLanguageModel {
                 .header("Content-Type", "application/json")
                 .header("Authorization", format!("Bearer {token}"))
                 .header(CLIENT_SUPPORTS_STATUS_MESSAGES_HEADER_NAME, "true")
-                // TODO: Uncomment once the cloud-side StreamEnded support PR is merged.
-                // .header(CLIENT_SUPPORTS_STATUS_STREAM_ENDED_HEADER_NAME, "true")
+                .header(CLIENT_SUPPORTS_STATUS_STREAM_ENDED_HEADER_NAME, "true")
                 .body(serde_json::to_string(&body)?.into())?;
 
             let mut response = http_client.send(request).await?;
@@ -938,8 +938,7 @@ where
     let provider = provider.clone();
     let mut stream = stream.fuse();
 
-    // TODO: Uncomment once the cloud-side StreamEnded support PR is merged.
-    // let mut saw_stream_ended = false;
+    let mut saw_stream_ended = false;
 
     let mut done = false;
     let mut pending = VecDeque::new();
@@ -961,10 +960,7 @@ where
                             vec![Err(LanguageModelCompletionError::from(error))]
                         }
                         Ok(CompletionEvent::Status(CompletionRequestStatus::StreamEnded)) => {
-                            // TODO: Uncomment once the cloud-side StreamEnded support PR is merged.
-                            // let mut saw_stream_ended = false;
-                            //
-                            // saw_stream_ended = true;
+                            saw_stream_ended = true;
                             vec![]
                         }
                         Ok(CompletionEvent::Status(status)) => {
@@ -983,15 +979,13 @@ where
                 Poll::Ready(None) => {
                     done = true;
 
-                    // TODO: Uncomment once the cloud-side StreamEnded support PR is merged.
-                    //
-                    // if !saw_stream_ended {
-                    //     return Poll::Ready(Some(Err(
-                    //         LanguageModelCompletionError::StreamEndedUnexpectedly {
-                    //             provider: provider.clone(),
-                    //         },
-                    //     )));
-                    // }
+                    if !saw_stream_ended {
+                        return Poll::Ready(Some(Err(
+                            LanguageModelCompletionError::StreamEndedUnexpectedly {
+                                provider: provider.clone(),
+                            },
+                        )));
+                    }
                 }
                 Poll::Pending => return Poll::Pending,
             }