1mod create_file_parser;
2mod edit_parser;
3#[cfg(test)]
4mod evals;
5mod streaming_fuzzy_matcher;
6
7use crate::{Template, Templates};
8use action_log::ActionLog;
9use anyhow::Result;
10use cloud_llm_client::CompletionIntent;
11use create_file_parser::{CreateFileParser, CreateFileParserEvent};
12pub use edit_parser::EditFormat;
13use edit_parser::{EditParser, EditParserEvent, EditParserMetrics};
14use futures::{
15 Stream, StreamExt,
16 channel::mpsc::{self, UnboundedReceiver},
17 pin_mut,
18 stream::BoxStream,
19};
20use gpui::{AppContext, AsyncApp, Entity, Task};
21use language::{Anchor, Buffer, BufferSnapshot, LineIndent, Point, TextBufferSnapshot};
22use language_model::{
23 LanguageModel, LanguageModelCompletionError, LanguageModelRequest, LanguageModelRequestMessage,
24 LanguageModelToolChoice, MessageContent, Role,
25};
26use project::{AgentLocation, Project};
27use schemars::JsonSchema;
28use serde::{Deserialize, Serialize};
29use std::{cmp, iter, mem, ops::Range, path::PathBuf, pin::Pin, sync::Arc, task::Poll};
30use streaming_diff::{CharOperation, StreamingDiff};
31use streaming_fuzzy_matcher::StreamingFuzzyMatcher;
32
33#[derive(Serialize)]
34struct CreateFilePromptTemplate {
35 path: Option<PathBuf>,
36 edit_description: String,
37}
38
39impl Template for CreateFilePromptTemplate {
40 const TEMPLATE_NAME: &'static str = "create_file_prompt.hbs";
41}
42
43#[derive(Serialize)]
44struct EditFileXmlPromptTemplate {
45 path: Option<PathBuf>,
46 edit_description: String,
47}
48
49impl Template for EditFileXmlPromptTemplate {
50 const TEMPLATE_NAME: &'static str = "edit_file_prompt_xml.hbs";
51}
52
53#[derive(Serialize)]
54struct EditFileDiffFencedPromptTemplate {
55 path: Option<PathBuf>,
56 edit_description: String,
57}
58
59impl Template for EditFileDiffFencedPromptTemplate {
60 const TEMPLATE_NAME: &'static str = "edit_file_prompt_diff_fenced.hbs";
61}
62
63#[derive(Clone, Debug, PartialEq, Eq)]
64pub enum EditAgentOutputEvent {
65 ResolvingEditRange(Range<Anchor>),
66 UnresolvedEditRange,
67 AmbiguousEditRange(Vec<Range<usize>>),
68 Edited,
69}
70
71#[derive(Clone, Debug, Serialize, Deserialize, JsonSchema)]
72pub struct EditAgentOutput {
73 pub raw_edits: String,
74 pub parser_metrics: EditParserMetrics,
75}
76
77#[derive(Clone)]
78pub struct EditAgent {
79 model: Arc<dyn LanguageModel>,
80 action_log: Entity<ActionLog>,
81 project: Entity<Project>,
82 templates: Arc<Templates>,
83 edit_format: EditFormat,
84}
85
86impl EditAgent {
87 pub fn new(
88 model: Arc<dyn LanguageModel>,
89 project: Entity<Project>,
90 action_log: Entity<ActionLog>,
91 templates: Arc<Templates>,
92 edit_format: EditFormat,
93 ) -> Self {
94 EditAgent {
95 model,
96 project,
97 action_log,
98 templates,
99 edit_format,
100 }
101 }
102
103 pub fn overwrite(
104 &self,
105 buffer: Entity<Buffer>,
106 edit_description: String,
107 conversation: &LanguageModelRequest,
108 cx: &mut AsyncApp,
109 ) -> (
110 Task<Result<EditAgentOutput>>,
111 mpsc::UnboundedReceiver<EditAgentOutputEvent>,
112 ) {
113 let this = self.clone();
114 let (events_tx, events_rx) = mpsc::unbounded();
115 let conversation = conversation.clone();
116 let output = cx.spawn(async move |cx| {
117 let snapshot = buffer.read_with(cx, |buffer, _| buffer.snapshot())?;
118 let path = cx.update(|cx| snapshot.resolve_file_path(cx, true))?;
119 let prompt = CreateFilePromptTemplate {
120 path,
121 edit_description,
122 }
123 .render(&this.templates)?;
124 let new_chunks = this
125 .request(conversation, CompletionIntent::CreateFile, prompt, cx)
126 .await?;
127
128 let (output, mut inner_events) = this.overwrite_with_chunks(buffer, new_chunks, cx);
129 while let Some(event) = inner_events.next().await {
130 events_tx.unbounded_send(event).ok();
131 }
132 output.await
133 });
134 (output, events_rx)
135 }
136
137 fn overwrite_with_chunks(
138 &self,
139 buffer: Entity<Buffer>,
140 edit_chunks: impl 'static + Send + Stream<Item = Result<String, LanguageModelCompletionError>>,
141 cx: &mut AsyncApp,
142 ) -> (
143 Task<Result<EditAgentOutput>>,
144 mpsc::UnboundedReceiver<EditAgentOutputEvent>,
145 ) {
146 let (output_events_tx, output_events_rx) = mpsc::unbounded();
147 let (parse_task, parse_rx) = Self::parse_create_file_chunks(edit_chunks, cx);
148 let this = self.clone();
149 let task = cx.spawn(async move |cx| {
150 this.action_log
151 .update(cx, |log, cx| log.buffer_created(buffer.clone(), cx))?;
152 this.overwrite_with_chunks_internal(buffer, parse_rx, output_events_tx, cx)
153 .await?;
154 parse_task.await
155 });
156 (task, output_events_rx)
157 }
158
159 async fn overwrite_with_chunks_internal(
160 &self,
161 buffer: Entity<Buffer>,
162 mut parse_rx: UnboundedReceiver<Result<CreateFileParserEvent>>,
163 output_events_tx: mpsc::UnboundedSender<EditAgentOutputEvent>,
164 cx: &mut AsyncApp,
165 ) -> Result<()> {
166 cx.update(|cx| {
167 buffer.update(cx, |buffer, cx| buffer.set_text("", cx));
168 self.action_log.update(cx, |log, cx| {
169 log.buffer_edited(buffer.clone(), cx);
170 });
171 self.project.update(cx, |project, cx| {
172 project.set_agent_location(
173 Some(AgentLocation {
174 buffer: buffer.downgrade(),
175 position: language::Anchor::MAX,
176 }),
177 cx,
178 )
179 });
180 output_events_tx
181 .unbounded_send(EditAgentOutputEvent::Edited)
182 .ok();
183 })?;
184
185 while let Some(event) = parse_rx.next().await {
186 match event? {
187 CreateFileParserEvent::NewTextChunk { chunk } => {
188 cx.update(|cx| {
189 buffer.update(cx, |buffer, cx| buffer.append(chunk, cx));
190 self.action_log
191 .update(cx, |log, cx| log.buffer_edited(buffer.clone(), cx));
192 self.project.update(cx, |project, cx| {
193 project.set_agent_location(
194 Some(AgentLocation {
195 buffer: buffer.downgrade(),
196 position: language::Anchor::MAX,
197 }),
198 cx,
199 )
200 });
201 })?;
202 output_events_tx
203 .unbounded_send(EditAgentOutputEvent::Edited)
204 .ok();
205 }
206 }
207 }
208
209 Ok(())
210 }
211
212 pub fn edit(
213 &self,
214 buffer: Entity<Buffer>,
215 edit_description: String,
216 conversation: &LanguageModelRequest,
217 cx: &mut AsyncApp,
218 ) -> (
219 Task<Result<EditAgentOutput>>,
220 mpsc::UnboundedReceiver<EditAgentOutputEvent>,
221 ) {
222 let this = self.clone();
223 let (events_tx, events_rx) = mpsc::unbounded();
224 let conversation = conversation.clone();
225 let edit_format = self.edit_format;
226 let output = cx.spawn(async move |cx| {
227 let snapshot = buffer.read_with(cx, |buffer, _| buffer.snapshot())?;
228 let path = cx.update(|cx| snapshot.resolve_file_path(cx, true))?;
229 let prompt = match edit_format {
230 EditFormat::XmlTags => EditFileXmlPromptTemplate {
231 path,
232 edit_description,
233 }
234 .render(&this.templates)?,
235 EditFormat::DiffFenced => EditFileDiffFencedPromptTemplate {
236 path,
237 edit_description,
238 }
239 .render(&this.templates)?,
240 };
241
242 let edit_chunks = this
243 .request(conversation, CompletionIntent::EditFile, prompt, cx)
244 .await?;
245 this.apply_edit_chunks(buffer, edit_chunks, events_tx, cx)
246 .await
247 });
248 (output, events_rx)
249 }
250
251 async fn apply_edit_chunks(
252 &self,
253 buffer: Entity<Buffer>,
254 edit_chunks: impl 'static + Send + Stream<Item = Result<String, LanguageModelCompletionError>>,
255 output_events: mpsc::UnboundedSender<EditAgentOutputEvent>,
256 cx: &mut AsyncApp,
257 ) -> Result<EditAgentOutput> {
258 self.action_log
259 .update(cx, |log, cx| log.buffer_read(buffer.clone(), cx))?;
260
261 let (output, edit_events) = Self::parse_edit_chunks(edit_chunks, self.edit_format, cx);
262 let mut edit_events = edit_events.peekable();
263 while let Some(edit_event) = Pin::new(&mut edit_events).peek().await {
264 // Skip events until we're at the start of a new edit.
265 let Ok(EditParserEvent::OldTextChunk { .. }) = edit_event else {
266 edit_events.next().await.unwrap()?;
267 continue;
268 };
269
270 let snapshot = buffer.read_with(cx, |buffer, _| buffer.snapshot())?;
271
272 // Resolve the old text in the background, updating the agent
273 // location as we keep refining which range it corresponds to.
274 let (resolve_old_text, mut old_range) =
275 Self::resolve_old_text(snapshot.text.clone(), edit_events, cx);
276 while let Ok(old_range) = old_range.recv().await {
277 if let Some(old_range) = old_range {
278 let old_range = snapshot.anchor_before(old_range.start)
279 ..snapshot.anchor_before(old_range.end);
280 self.project.update(cx, |project, cx| {
281 project.set_agent_location(
282 Some(AgentLocation {
283 buffer: buffer.downgrade(),
284 position: old_range.end,
285 }),
286 cx,
287 );
288 })?;
289 output_events
290 .unbounded_send(EditAgentOutputEvent::ResolvingEditRange(old_range))
291 .ok();
292 }
293 }
294
295 let (edit_events_, mut resolved_old_text) = resolve_old_text.await?;
296 edit_events = edit_events_;
297
298 // If we can't resolve the old text, restart the loop waiting for a
299 // new edit (or for the stream to end).
300 let resolved_old_text = match resolved_old_text.len() {
301 1 => resolved_old_text.pop().unwrap(),
302 0 => {
303 output_events
304 .unbounded_send(EditAgentOutputEvent::UnresolvedEditRange)
305 .ok();
306 continue;
307 }
308 _ => {
309 let ranges = resolved_old_text
310 .into_iter()
311 .map(|text| {
312 let start_line =
313 (snapshot.offset_to_point(text.range.start).row + 1) as usize;
314 let end_line =
315 (snapshot.offset_to_point(text.range.end).row + 1) as usize;
316 start_line..end_line
317 })
318 .collect();
319 output_events
320 .unbounded_send(EditAgentOutputEvent::AmbiguousEditRange(ranges))
321 .ok();
322 continue;
323 }
324 };
325
326 // Compute edits in the background and apply them as they become
327 // available.
328 let (compute_edits, edits) =
329 Self::compute_edits(snapshot, resolved_old_text, edit_events, cx);
330 let mut edits = edits.ready_chunks(32);
331 while let Some(edits) = edits.next().await {
332 if edits.is_empty() {
333 continue;
334 }
335
336 // Edit the buffer and report edits to the action log as part of the
337 // same effect cycle, otherwise the edit will be reported as if the
338 // user made it.
339 cx.update(|cx| {
340 let max_edit_end = buffer.update(cx, |buffer, cx| {
341 buffer.edit(edits.iter().cloned(), None, cx);
342 let max_edit_end = buffer
343 .summaries_for_anchors::<Point, _>(
344 edits.iter().map(|(range, _)| &range.end),
345 )
346 .max()
347 .unwrap();
348 buffer.anchor_before(max_edit_end)
349 });
350 self.action_log
351 .update(cx, |log, cx| log.buffer_edited(buffer.clone(), cx));
352 self.project.update(cx, |project, cx| {
353 project.set_agent_location(
354 Some(AgentLocation {
355 buffer: buffer.downgrade(),
356 position: max_edit_end,
357 }),
358 cx,
359 );
360 });
361 })?;
362 output_events
363 .unbounded_send(EditAgentOutputEvent::Edited)
364 .ok();
365 }
366
367 edit_events = compute_edits.await?;
368 }
369
370 output.await
371 }
372
373 fn parse_edit_chunks(
374 chunks: impl 'static + Send + Stream<Item = Result<String, LanguageModelCompletionError>>,
375 edit_format: EditFormat,
376 cx: &mut AsyncApp,
377 ) -> (
378 Task<Result<EditAgentOutput>>,
379 UnboundedReceiver<Result<EditParserEvent>>,
380 ) {
381 let (tx, rx) = mpsc::unbounded();
382 let output = cx.background_spawn(async move {
383 pin_mut!(chunks);
384
385 let mut parser = EditParser::new(edit_format);
386 let mut raw_edits = String::new();
387 while let Some(chunk) = chunks.next().await {
388 match chunk {
389 Ok(chunk) => {
390 raw_edits.push_str(&chunk);
391 for event in parser.push(&chunk) {
392 tx.unbounded_send(Ok(event))?;
393 }
394 }
395 Err(error) => {
396 tx.unbounded_send(Err(error.into()))?;
397 }
398 }
399 }
400 Ok(EditAgentOutput {
401 raw_edits,
402 parser_metrics: parser.finish(),
403 })
404 });
405 (output, rx)
406 }
407
408 fn parse_create_file_chunks(
409 chunks: impl 'static + Send + Stream<Item = Result<String, LanguageModelCompletionError>>,
410 cx: &mut AsyncApp,
411 ) -> (
412 Task<Result<EditAgentOutput>>,
413 UnboundedReceiver<Result<CreateFileParserEvent>>,
414 ) {
415 let (tx, rx) = mpsc::unbounded();
416 let output = cx.background_spawn(async move {
417 pin_mut!(chunks);
418
419 let mut parser = CreateFileParser::new();
420 let mut raw_edits = String::new();
421 while let Some(chunk) = chunks.next().await {
422 match chunk {
423 Ok(chunk) => {
424 raw_edits.push_str(&chunk);
425 for event in parser.push(Some(&chunk)) {
426 tx.unbounded_send(Ok(event))?;
427 }
428 }
429 Err(error) => {
430 tx.unbounded_send(Err(error.into()))?;
431 }
432 }
433 }
434 // Send final events with None to indicate completion
435 for event in parser.push(None) {
436 tx.unbounded_send(Ok(event))?;
437 }
438 Ok(EditAgentOutput {
439 raw_edits,
440 parser_metrics: EditParserMetrics::default(),
441 })
442 });
443 (output, rx)
444 }
445
446 fn resolve_old_text<T>(
447 snapshot: TextBufferSnapshot,
448 mut edit_events: T,
449 cx: &mut AsyncApp,
450 ) -> (
451 Task<Result<(T, Vec<ResolvedOldText>)>>,
452 watch::Receiver<Option<Range<usize>>>,
453 )
454 where
455 T: 'static + Send + Unpin + Stream<Item = Result<EditParserEvent>>,
456 {
457 let (mut old_range_tx, old_range_rx) = watch::channel(None);
458 let task = cx.background_spawn(async move {
459 let mut matcher = StreamingFuzzyMatcher::new(snapshot);
460 while let Some(edit_event) = edit_events.next().await {
461 let EditParserEvent::OldTextChunk {
462 chunk,
463 done,
464 line_hint,
465 } = edit_event?
466 else {
467 break;
468 };
469
470 old_range_tx.send(matcher.push(&chunk, line_hint))?;
471 if done {
472 break;
473 }
474 }
475
476 let matches = matcher.finish();
477 let best_match = matcher.select_best_match();
478
479 old_range_tx.send(best_match.clone())?;
480
481 let indent = LineIndent::from_iter(
482 matcher
483 .query_lines()
484 .first()
485 .unwrap_or(&String::new())
486 .chars(),
487 );
488
489 let resolved_old_texts = if let Some(best_match) = best_match {
490 vec![ResolvedOldText {
491 range: best_match,
492 indent,
493 }]
494 } else {
495 matches
496 .into_iter()
497 .map(|range| ResolvedOldText { range, indent })
498 .collect::<Vec<_>>()
499 };
500
501 Ok((edit_events, resolved_old_texts))
502 });
503
504 (task, old_range_rx)
505 }
506
507 fn compute_edits<T>(
508 snapshot: BufferSnapshot,
509 resolved_old_text: ResolvedOldText,
510 mut edit_events: T,
511 cx: &mut AsyncApp,
512 ) -> (
513 Task<Result<T>>,
514 UnboundedReceiver<(Range<Anchor>, Arc<str>)>,
515 )
516 where
517 T: 'static + Send + Unpin + Stream<Item = Result<EditParserEvent>>,
518 {
519 let (edits_tx, edits_rx) = mpsc::unbounded();
520 let compute_edits = cx.background_spawn(async move {
521 let buffer_start_indent = snapshot
522 .line_indent_for_row(snapshot.offset_to_point(resolved_old_text.range.start).row);
523 let indent_delta = if buffer_start_indent.tabs > 0 {
524 IndentDelta::Tabs(
525 buffer_start_indent.tabs as isize - resolved_old_text.indent.tabs as isize,
526 )
527 } else {
528 IndentDelta::Spaces(
529 buffer_start_indent.spaces as isize - resolved_old_text.indent.spaces as isize,
530 )
531 };
532
533 let old_text = snapshot
534 .text_for_range(resolved_old_text.range.clone())
535 .collect::<String>();
536 let mut diff = StreamingDiff::new(old_text);
537 let mut edit_start = resolved_old_text.range.start;
538 let mut new_text_chunks =
539 Self::reindent_new_text_chunks(indent_delta, &mut edit_events);
540 let mut done = false;
541 while !done {
542 let char_operations = if let Some(new_text_chunk) = new_text_chunks.next().await {
543 diff.push_new(&new_text_chunk?)
544 } else {
545 done = true;
546 mem::take(&mut diff).finish()
547 };
548
549 for op in char_operations {
550 match op {
551 CharOperation::Insert { text } => {
552 let edit_start = snapshot.anchor_after(edit_start);
553 edits_tx.unbounded_send((edit_start..edit_start, Arc::from(text)))?;
554 }
555 CharOperation::Delete { bytes } => {
556 let edit_end = edit_start + bytes;
557 let edit_range =
558 snapshot.anchor_after(edit_start)..snapshot.anchor_before(edit_end);
559 edit_start = edit_end;
560 edits_tx.unbounded_send((edit_range, Arc::from("")))?;
561 }
562 CharOperation::Keep { bytes } => edit_start += bytes,
563 }
564 }
565 }
566
567 drop(new_text_chunks);
568 anyhow::Ok(edit_events)
569 });
570
571 (compute_edits, edits_rx)
572 }
573
574 fn reindent_new_text_chunks(
575 delta: IndentDelta,
576 mut stream: impl Unpin + Stream<Item = Result<EditParserEvent>>,
577 ) -> impl Stream<Item = Result<String>> {
578 let mut buffer = String::new();
579 let mut in_leading_whitespace = true;
580 let mut done = false;
581 futures::stream::poll_fn(move |cx| {
582 while !done {
583 let (chunk, is_last_chunk) = match stream.poll_next_unpin(cx) {
584 Poll::Ready(Some(Ok(EditParserEvent::NewTextChunk { chunk, done }))) => {
585 (chunk, done)
586 }
587 Poll::Ready(Some(Err(err))) => return Poll::Ready(Some(Err(err))),
588 Poll::Pending => return Poll::Pending,
589 _ => return Poll::Ready(None),
590 };
591
592 buffer.push_str(&chunk);
593
594 let mut indented_new_text = String::new();
595 let mut start_ix = 0;
596 let mut newlines = buffer.match_indices('\n').peekable();
597 loop {
598 let (line_end, is_pending_line) = match newlines.next() {
599 Some((ix, _)) => (ix, false),
600 None => (buffer.len(), true),
601 };
602 let line = &buffer[start_ix..line_end];
603
604 if in_leading_whitespace {
605 if let Some(non_whitespace_ix) = line.find(|c| delta.character() != c) {
606 // We found a non-whitespace character, adjust
607 // indentation based on the delta.
608 let new_indent_len =
609 cmp::max(0, non_whitespace_ix as isize + delta.len()) as usize;
610 indented_new_text
611 .extend(iter::repeat(delta.character()).take(new_indent_len));
612 indented_new_text.push_str(&line[non_whitespace_ix..]);
613 in_leading_whitespace = false;
614 } else if is_pending_line {
615 // We're still in leading whitespace and this line is incomplete.
616 // Stop processing until we receive more input.
617 break;
618 } else {
619 // This line is entirely whitespace. Push it without indentation.
620 indented_new_text.push_str(line);
621 }
622 } else {
623 indented_new_text.push_str(line);
624 }
625
626 if is_pending_line {
627 start_ix = line_end;
628 break;
629 } else {
630 in_leading_whitespace = true;
631 indented_new_text.push('\n');
632 start_ix = line_end + 1;
633 }
634 }
635 buffer.replace_range(..start_ix, "");
636
637 // This was the last chunk, push all the buffered content as-is.
638 if is_last_chunk {
639 indented_new_text.push_str(&buffer);
640 buffer.clear();
641 done = true;
642 }
643
644 if !indented_new_text.is_empty() {
645 return Poll::Ready(Some(Ok(indented_new_text)));
646 }
647 }
648
649 Poll::Ready(None)
650 })
651 }
652
653 async fn request(
654 &self,
655 mut conversation: LanguageModelRequest,
656 intent: CompletionIntent,
657 prompt: String,
658 cx: &mut AsyncApp,
659 ) -> Result<BoxStream<'static, Result<String, LanguageModelCompletionError>>> {
660 let mut messages_iter = conversation.messages.iter_mut();
661 if let Some(last_message) = messages_iter.next_back() {
662 if last_message.role == Role::Assistant {
663 let old_content_len = last_message.content.len();
664 last_message
665 .content
666 .retain(|content| !matches!(content, MessageContent::ToolUse(_)));
667 let new_content_len = last_message.content.len();
668
669 // We just removed pending tool uses from the content of the
670 // last message, so it doesn't make sense to cache it anymore
671 // (e.g., the message will look very different on the next
672 // request). Thus, we move the flag to the message prior to it,
673 // as it will still be a valid prefix of the conversation.
674 if old_content_len != new_content_len && last_message.cache {
675 if let Some(prev_message) = messages_iter.next_back() {
676 last_message.cache = false;
677 prev_message.cache = true;
678 }
679 }
680
681 if last_message.content.is_empty() {
682 conversation.messages.pop();
683 }
684 }
685 }
686
687 conversation.messages.push(LanguageModelRequestMessage {
688 role: Role::User,
689 content: vec![MessageContent::Text(prompt)],
690 cache: false,
691 });
692
693 // Include tools in the request so that we can take advantage of
694 // caching when ToolChoice::None is supported.
695 let mut tool_choice = None;
696 let mut tools = Vec::new();
697 if !conversation.tools.is_empty()
698 && self
699 .model
700 .supports_tool_choice(LanguageModelToolChoice::None)
701 {
702 tool_choice = Some(LanguageModelToolChoice::None);
703 tools = conversation.tools.clone();
704 }
705
706 let request = LanguageModelRequest {
707 thread_id: conversation.thread_id,
708 prompt_id: conversation.prompt_id,
709 intent: Some(intent),
710 mode: conversation.mode,
711 messages: conversation.messages,
712 tool_choice,
713 tools,
714 stop: Vec::new(),
715 temperature: None,
716 thinking_allowed: true,
717 };
718
719 Ok(self.model.stream_completion_text(request, cx).await?.stream)
720 }
721}
722
723struct ResolvedOldText {
724 range: Range<usize>,
725 indent: LineIndent,
726}
727
728#[derive(Copy, Clone, Debug)]
729enum IndentDelta {
730 Spaces(isize),
731 Tabs(isize),
732}
733
734impl IndentDelta {
735 fn character(&self) -> char {
736 match self {
737 IndentDelta::Spaces(_) => ' ',
738 IndentDelta::Tabs(_) => '\t',
739 }
740 }
741
742 fn len(&self) -> isize {
743 match self {
744 IndentDelta::Spaces(n) => *n,
745 IndentDelta::Tabs(n) => *n,
746 }
747 }
748}
749
750#[cfg(test)]
751mod tests {
752 use super::*;
753 use fs::FakeFs;
754 use futures::stream;
755 use gpui::{AppContext, TestAppContext};
756 use indoc::indoc;
757 use language_model::fake_provider::FakeLanguageModel;
758 use project::{AgentLocation, Project};
759 use rand::prelude::*;
760 use rand::rngs::StdRng;
761 use std::cmp;
762
763 #[gpui::test(iterations = 100)]
764 async fn test_empty_old_text(cx: &mut TestAppContext, mut rng: StdRng) {
765 let agent = init_test(cx).await;
766 let buffer = cx.new(|cx| {
767 Buffer::local(
768 indoc! {"
769 abc
770 def
771 ghi
772 "},
773 cx,
774 )
775 });
776 let (apply, _events) = agent.edit(
777 buffer.clone(),
778 String::new(),
779 &LanguageModelRequest::default(),
780 &mut cx.to_async(),
781 );
782 cx.run_until_parked();
783
784 simulate_llm_output(
785 &agent,
786 indoc! {"
787 <old_text></old_text>
788 <new_text>jkl</new_text>
789 <old_text>def</old_text>
790 <new_text>DEF</new_text>
791 "},
792 &mut rng,
793 cx,
794 );
795 apply.await.unwrap();
796
797 pretty_assertions::assert_eq!(
798 buffer.read_with(cx, |buffer, _| buffer.snapshot().text()),
799 indoc! {"
800 abc
801 DEF
802 ghi
803 "}
804 );
805 }
806
807 #[gpui::test(iterations = 100)]
808 async fn test_indentation(cx: &mut TestAppContext, mut rng: StdRng) {
809 let agent = init_test(cx).await;
810 let buffer = cx.new(|cx| {
811 Buffer::local(
812 indoc! {"
813 lorem
814 ipsum
815 dolor
816 sit
817 "},
818 cx,
819 )
820 });
821 let (apply, _events) = agent.edit(
822 buffer.clone(),
823 String::new(),
824 &LanguageModelRequest::default(),
825 &mut cx.to_async(),
826 );
827 cx.run_until_parked();
828
829 simulate_llm_output(
830 &agent,
831 indoc! {"
832 <old_text>
833 ipsum
834 dolor
835 sit
836 </old_text>
837 <new_text>
838 ipsum
839 dolor
840 sit
841 amet
842 </new_text>
843 "},
844 &mut rng,
845 cx,
846 );
847 apply.await.unwrap();
848
849 pretty_assertions::assert_eq!(
850 buffer.read_with(cx, |buffer, _| buffer.snapshot().text()),
851 indoc! {"
852 lorem
853 ipsum
854 dolor
855 sit
856 amet
857 "}
858 );
859 }
860
861 #[gpui::test(iterations = 100)]
862 async fn test_dependent_edits(cx: &mut TestAppContext, mut rng: StdRng) {
863 let agent = init_test(cx).await;
864 let buffer = cx.new(|cx| Buffer::local("abc\ndef\nghi", cx));
865 let (apply, _events) = agent.edit(
866 buffer.clone(),
867 String::new(),
868 &LanguageModelRequest::default(),
869 &mut cx.to_async(),
870 );
871 cx.run_until_parked();
872
873 simulate_llm_output(
874 &agent,
875 indoc! {"
876 <old_text>
877 def
878 </old_text>
879 <new_text>
880 DEF
881 </new_text>
882
883 <old_text>
884 DEF
885 </old_text>
886 <new_text>
887 DeF
888 </new_text>
889 "},
890 &mut rng,
891 cx,
892 );
893 apply.await.unwrap();
894
895 assert_eq!(
896 buffer.read_with(cx, |buffer, _| buffer.snapshot().text()),
897 "abc\nDeF\nghi"
898 );
899 }
900
901 #[gpui::test(iterations = 100)]
902 async fn test_old_text_hallucination(cx: &mut TestAppContext, mut rng: StdRng) {
903 let agent = init_test(cx).await;
904 let buffer = cx.new(|cx| Buffer::local("abc\ndef\nghi", cx));
905 let (apply, _events) = agent.edit(
906 buffer.clone(),
907 String::new(),
908 &LanguageModelRequest::default(),
909 &mut cx.to_async(),
910 );
911 cx.run_until_parked();
912
913 simulate_llm_output(
914 &agent,
915 indoc! {"
916 <old_text>
917 jkl
918 </old_text>
919 <new_text>
920 mno
921 </new_text>
922
923 <old_text>
924 abc
925 </old_text>
926 <new_text>
927 ABC
928 </new_text>
929 "},
930 &mut rng,
931 cx,
932 );
933 apply.await.unwrap();
934
935 assert_eq!(
936 buffer.read_with(cx, |buffer, _| buffer.snapshot().text()),
937 "ABC\ndef\nghi"
938 );
939 }
940
941 #[gpui::test]
942 async fn test_edit_events(cx: &mut TestAppContext) {
943 let agent = init_test(cx).await;
944 let model = agent.model.as_fake();
945 let project = agent
946 .action_log
947 .read_with(cx, |log, _| log.project().clone());
948 let buffer = cx.new(|cx| Buffer::local("abc\ndef\nghi\njkl", cx));
949
950 let mut async_cx = cx.to_async();
951 let (apply, mut events) = agent.edit(
952 buffer.clone(),
953 String::new(),
954 &LanguageModelRequest::default(),
955 &mut async_cx,
956 );
957 cx.run_until_parked();
958
959 model.send_last_completion_stream_text_chunk("<old_text>a");
960 cx.run_until_parked();
961 assert_eq!(drain_events(&mut events), vec![]);
962 assert_eq!(
963 buffer.read_with(cx, |buffer, _| buffer.snapshot().text()),
964 "abc\ndef\nghi\njkl"
965 );
966 assert_eq!(
967 project.read_with(cx, |project, _| project.agent_location()),
968 None
969 );
970
971 model.send_last_completion_stream_text_chunk("bc</old_text>");
972 cx.run_until_parked();
973 assert_eq!(
974 drain_events(&mut events),
975 vec![EditAgentOutputEvent::ResolvingEditRange(buffer.read_with(
976 cx,
977 |buffer, _| buffer.anchor_before(Point::new(0, 0))
978 ..buffer.anchor_before(Point::new(0, 3))
979 ))]
980 );
981 assert_eq!(
982 buffer.read_with(cx, |buffer, _| buffer.snapshot().text()),
983 "abc\ndef\nghi\njkl"
984 );
985 assert_eq!(
986 project.read_with(cx, |project, _| project.agent_location()),
987 Some(AgentLocation {
988 buffer: buffer.downgrade(),
989 position: buffer.read_with(cx, |buffer, _| buffer.anchor_before(Point::new(0, 3)))
990 })
991 );
992
993 model.send_last_completion_stream_text_chunk("<new_text>abX");
994 cx.run_until_parked();
995 assert_eq!(drain_events(&mut events), [EditAgentOutputEvent::Edited]);
996 assert_eq!(
997 buffer.read_with(cx, |buffer, _| buffer.snapshot().text()),
998 "abXc\ndef\nghi\njkl"
999 );
1000 assert_eq!(
1001 project.read_with(cx, |project, _| project.agent_location()),
1002 Some(AgentLocation {
1003 buffer: buffer.downgrade(),
1004 position: buffer.read_with(cx, |buffer, _| buffer.anchor_before(Point::new(0, 3)))
1005 })
1006 );
1007
1008 model.send_last_completion_stream_text_chunk("cY");
1009 cx.run_until_parked();
1010 assert_eq!(drain_events(&mut events), [EditAgentOutputEvent::Edited]);
1011 assert_eq!(
1012 buffer.read_with(cx, |buffer, _| buffer.snapshot().text()),
1013 "abXcY\ndef\nghi\njkl"
1014 );
1015 assert_eq!(
1016 project.read_with(cx, |project, _| project.agent_location()),
1017 Some(AgentLocation {
1018 buffer: buffer.downgrade(),
1019 position: buffer.read_with(cx, |buffer, _| buffer.anchor_before(Point::new(0, 5)))
1020 })
1021 );
1022
1023 model.send_last_completion_stream_text_chunk("</new_text>");
1024 model.send_last_completion_stream_text_chunk("<old_text>hall");
1025 cx.run_until_parked();
1026 assert_eq!(drain_events(&mut events), vec![]);
1027 assert_eq!(
1028 buffer.read_with(cx, |buffer, _| buffer.snapshot().text()),
1029 "abXcY\ndef\nghi\njkl"
1030 );
1031 assert_eq!(
1032 project.read_with(cx, |project, _| project.agent_location()),
1033 Some(AgentLocation {
1034 buffer: buffer.downgrade(),
1035 position: buffer.read_with(cx, |buffer, _| buffer.anchor_before(Point::new(0, 5)))
1036 })
1037 );
1038
1039 model.send_last_completion_stream_text_chunk("ucinated old</old_text>");
1040 model.send_last_completion_stream_text_chunk("<new_text>");
1041 cx.run_until_parked();
1042 assert_eq!(
1043 drain_events(&mut events),
1044 vec![EditAgentOutputEvent::UnresolvedEditRange]
1045 );
1046 assert_eq!(
1047 buffer.read_with(cx, |buffer, _| buffer.snapshot().text()),
1048 "abXcY\ndef\nghi\njkl"
1049 );
1050 assert_eq!(
1051 project.read_with(cx, |project, _| project.agent_location()),
1052 Some(AgentLocation {
1053 buffer: buffer.downgrade(),
1054 position: buffer.read_with(cx, |buffer, _| buffer.anchor_before(Point::new(0, 5)))
1055 })
1056 );
1057
1058 model.send_last_completion_stream_text_chunk("hallucinated new</new_");
1059 model.send_last_completion_stream_text_chunk("text>");
1060 cx.run_until_parked();
1061 assert_eq!(drain_events(&mut events), vec![]);
1062 assert_eq!(
1063 buffer.read_with(cx, |buffer, _| buffer.snapshot().text()),
1064 "abXcY\ndef\nghi\njkl"
1065 );
1066 assert_eq!(
1067 project.read_with(cx, |project, _| project.agent_location()),
1068 Some(AgentLocation {
1069 buffer: buffer.downgrade(),
1070 position: buffer.read_with(cx, |buffer, _| buffer.anchor_before(Point::new(0, 5)))
1071 })
1072 );
1073
1074 model.send_last_completion_stream_text_chunk("<old_text>\nghi\nj");
1075 cx.run_until_parked();
1076 assert_eq!(
1077 drain_events(&mut events),
1078 vec![EditAgentOutputEvent::ResolvingEditRange(buffer.read_with(
1079 cx,
1080 |buffer, _| buffer.anchor_before(Point::new(2, 0))
1081 ..buffer.anchor_before(Point::new(2, 3))
1082 ))]
1083 );
1084 assert_eq!(
1085 buffer.read_with(cx, |buffer, _| buffer.snapshot().text()),
1086 "abXcY\ndef\nghi\njkl"
1087 );
1088 assert_eq!(
1089 project.read_with(cx, |project, _| project.agent_location()),
1090 Some(AgentLocation {
1091 buffer: buffer.downgrade(),
1092 position: buffer.read_with(cx, |buffer, _| buffer.anchor_before(Point::new(2, 3)))
1093 })
1094 );
1095
1096 model.send_last_completion_stream_text_chunk("kl</old_text>");
1097 model.send_last_completion_stream_text_chunk("<new_text>");
1098 cx.run_until_parked();
1099 assert_eq!(
1100 drain_events(&mut events),
1101 vec![EditAgentOutputEvent::ResolvingEditRange(buffer.read_with(
1102 cx,
1103 |buffer, _| buffer.anchor_before(Point::new(2, 0))
1104 ..buffer.anchor_before(Point::new(3, 3))
1105 ))]
1106 );
1107 assert_eq!(
1108 buffer.read_with(cx, |buffer, _| buffer.snapshot().text()),
1109 "abXcY\ndef\nghi\njkl"
1110 );
1111 assert_eq!(
1112 project.read_with(cx, |project, _| project.agent_location()),
1113 Some(AgentLocation {
1114 buffer: buffer.downgrade(),
1115 position: buffer.read_with(cx, |buffer, _| buffer.anchor_before(Point::new(3, 3)))
1116 })
1117 );
1118
1119 model.send_last_completion_stream_text_chunk("GHI</new_text>");
1120 cx.run_until_parked();
1121 assert_eq!(
1122 drain_events(&mut events),
1123 vec![EditAgentOutputEvent::Edited]
1124 );
1125 assert_eq!(
1126 buffer.read_with(cx, |buffer, _| buffer.snapshot().text()),
1127 "abXcY\ndef\nGHI"
1128 );
1129 assert_eq!(
1130 project.read_with(cx, |project, _| project.agent_location()),
1131 Some(AgentLocation {
1132 buffer: buffer.downgrade(),
1133 position: buffer.read_with(cx, |buffer, _| buffer.anchor_before(Point::new(2, 3)))
1134 })
1135 );
1136
1137 model.end_last_completion_stream();
1138 apply.await.unwrap();
1139 assert_eq!(
1140 buffer.read_with(cx, |buffer, _| buffer.snapshot().text()),
1141 "abXcY\ndef\nGHI"
1142 );
1143 assert_eq!(drain_events(&mut events), vec![]);
1144 assert_eq!(
1145 project.read_with(cx, |project, _| project.agent_location()),
1146 Some(AgentLocation {
1147 buffer: buffer.downgrade(),
1148 position: buffer.read_with(cx, |buffer, _| buffer.anchor_before(Point::new(2, 3)))
1149 })
1150 );
1151 }
1152
1153 #[gpui::test]
1154 async fn test_overwrite_events(cx: &mut TestAppContext) {
1155 let agent = init_test(cx).await;
1156 let project = agent
1157 .action_log
1158 .read_with(cx, |log, _| log.project().clone());
1159 let buffer = cx.new(|cx| Buffer::local("abc\ndef\nghi", cx));
1160 let (chunks_tx, chunks_rx) = mpsc::unbounded();
1161 let (apply, mut events) = agent.overwrite_with_chunks(
1162 buffer.clone(),
1163 chunks_rx.map(|chunk: &str| Ok(chunk.to_string())),
1164 &mut cx.to_async(),
1165 );
1166
1167 cx.run_until_parked();
1168 assert_eq!(
1169 drain_events(&mut events),
1170 vec![EditAgentOutputEvent::Edited]
1171 );
1172 assert_eq!(
1173 buffer.read_with(cx, |buffer, _| buffer.snapshot().text()),
1174 ""
1175 );
1176 assert_eq!(
1177 project.read_with(cx, |project, _| project.agent_location()),
1178 Some(AgentLocation {
1179 buffer: buffer.downgrade(),
1180 position: language::Anchor::MAX
1181 })
1182 );
1183
1184 chunks_tx.unbounded_send("```\njkl\n").unwrap();
1185 cx.run_until_parked();
1186 assert_eq!(
1187 drain_events(&mut events),
1188 vec![EditAgentOutputEvent::Edited]
1189 );
1190 assert_eq!(
1191 buffer.read_with(cx, |buffer, _| buffer.snapshot().text()),
1192 "jkl"
1193 );
1194 assert_eq!(
1195 project.read_with(cx, |project, _| project.agent_location()),
1196 Some(AgentLocation {
1197 buffer: buffer.downgrade(),
1198 position: language::Anchor::MAX
1199 })
1200 );
1201
1202 chunks_tx.unbounded_send("mno\n").unwrap();
1203 cx.run_until_parked();
1204 assert_eq!(
1205 drain_events(&mut events),
1206 vec![EditAgentOutputEvent::Edited]
1207 );
1208 assert_eq!(
1209 buffer.read_with(cx, |buffer, _| buffer.snapshot().text()),
1210 "jkl\nmno"
1211 );
1212 assert_eq!(
1213 project.read_with(cx, |project, _| project.agent_location()),
1214 Some(AgentLocation {
1215 buffer: buffer.downgrade(),
1216 position: language::Anchor::MAX
1217 })
1218 );
1219
1220 chunks_tx.unbounded_send("pqr\n```").unwrap();
1221 cx.run_until_parked();
1222 assert_eq!(
1223 drain_events(&mut events),
1224 vec![EditAgentOutputEvent::Edited]
1225 );
1226 assert_eq!(
1227 buffer.read_with(cx, |buffer, _| buffer.snapshot().text()),
1228 "jkl\nmno\npqr"
1229 );
1230 assert_eq!(
1231 project.read_with(cx, |project, _| project.agent_location()),
1232 Some(AgentLocation {
1233 buffer: buffer.downgrade(),
1234 position: language::Anchor::MAX
1235 })
1236 );
1237
1238 drop(chunks_tx);
1239 apply.await.unwrap();
1240 assert_eq!(
1241 buffer.read_with(cx, |buffer, _| buffer.snapshot().text()),
1242 "jkl\nmno\npqr"
1243 );
1244 assert_eq!(drain_events(&mut events), vec![]);
1245 assert_eq!(
1246 project.read_with(cx, |project, _| project.agent_location()),
1247 Some(AgentLocation {
1248 buffer: buffer.downgrade(),
1249 position: language::Anchor::MAX
1250 })
1251 );
1252 }
1253
1254 #[gpui::test(iterations = 100)]
1255 async fn test_indent_new_text_chunks(mut rng: StdRng) {
1256 let chunks = to_random_chunks(&mut rng, " abc\n def\n ghi");
1257 let new_text_chunks = stream::iter(chunks.iter().enumerate().map(|(index, chunk)| {
1258 Ok(EditParserEvent::NewTextChunk {
1259 chunk: chunk.clone(),
1260 done: index == chunks.len() - 1,
1261 })
1262 }));
1263 let indented_chunks =
1264 EditAgent::reindent_new_text_chunks(IndentDelta::Spaces(2), new_text_chunks)
1265 .collect::<Vec<_>>()
1266 .await;
1267 let new_text = indented_chunks
1268 .into_iter()
1269 .collect::<Result<String>>()
1270 .unwrap();
1271 assert_eq!(new_text, " abc\n def\n ghi");
1272 }
1273
1274 #[gpui::test(iterations = 100)]
1275 async fn test_outdent_new_text_chunks(mut rng: StdRng) {
1276 let chunks = to_random_chunks(&mut rng, "\t\t\t\tabc\n\t\tdef\n\t\t\t\t\t\tghi");
1277 let new_text_chunks = stream::iter(chunks.iter().enumerate().map(|(index, chunk)| {
1278 Ok(EditParserEvent::NewTextChunk {
1279 chunk: chunk.clone(),
1280 done: index == chunks.len() - 1,
1281 })
1282 }));
1283 let indented_chunks =
1284 EditAgent::reindent_new_text_chunks(IndentDelta::Tabs(-2), new_text_chunks)
1285 .collect::<Vec<_>>()
1286 .await;
1287 let new_text = indented_chunks
1288 .into_iter()
1289 .collect::<Result<String>>()
1290 .unwrap();
1291 assert_eq!(new_text, "\t\tabc\ndef\n\t\t\t\tghi");
1292 }
1293
1294 #[gpui::test(iterations = 100)]
1295 async fn test_random_indents(mut rng: StdRng) {
1296 let len = rng.gen_range(1..=100);
1297 let new_text = util::RandomCharIter::new(&mut rng)
1298 .with_simple_text()
1299 .take(len)
1300 .collect::<String>();
1301 let new_text = new_text
1302 .split('\n')
1303 .map(|line| format!("{}{}", " ".repeat(rng.gen_range(0..=8)), line))
1304 .collect::<Vec<_>>()
1305 .join("\n");
1306 let delta = IndentDelta::Spaces(rng.gen_range(-4..=4));
1307
1308 let chunks = to_random_chunks(&mut rng, &new_text);
1309 let new_text_chunks = stream::iter(chunks.iter().enumerate().map(|(index, chunk)| {
1310 Ok(EditParserEvent::NewTextChunk {
1311 chunk: chunk.clone(),
1312 done: index == chunks.len() - 1,
1313 })
1314 }));
1315 let reindented_chunks = EditAgent::reindent_new_text_chunks(delta, new_text_chunks)
1316 .collect::<Vec<_>>()
1317 .await;
1318 let actual_reindented_text = reindented_chunks
1319 .into_iter()
1320 .collect::<Result<String>>()
1321 .unwrap();
1322 let expected_reindented_text = new_text
1323 .split('\n')
1324 .map(|line| {
1325 if let Some(ix) = line.find(|c| c != ' ') {
1326 let new_indent = cmp::max(0, ix as isize + delta.len()) as usize;
1327 format!("{}{}", " ".repeat(new_indent), &line[ix..])
1328 } else {
1329 line.to_string()
1330 }
1331 })
1332 .collect::<Vec<_>>()
1333 .join("\n");
1334 assert_eq!(actual_reindented_text, expected_reindented_text);
1335 }
1336
1337 fn to_random_chunks(rng: &mut StdRng, input: &str) -> Vec<String> {
1338 let chunk_count = rng.gen_range(1..=cmp::min(input.len(), 50));
1339 let mut chunk_indices = (0..input.len()).choose_multiple(rng, chunk_count);
1340 chunk_indices.sort();
1341 chunk_indices.push(input.len());
1342
1343 let mut chunks = Vec::new();
1344 let mut last_ix = 0;
1345 for chunk_ix in chunk_indices {
1346 chunks.push(input[last_ix..chunk_ix].to_string());
1347 last_ix = chunk_ix;
1348 }
1349 chunks
1350 }
1351
1352 fn simulate_llm_output(
1353 agent: &EditAgent,
1354 output: &str,
1355 rng: &mut StdRng,
1356 cx: &mut TestAppContext,
1357 ) {
1358 let executor = cx.executor();
1359 let chunks = to_random_chunks(rng, output);
1360 let model = agent.model.clone();
1361 cx.background_spawn(async move {
1362 for chunk in chunks {
1363 executor.simulate_random_delay().await;
1364 model
1365 .as_fake()
1366 .send_last_completion_stream_text_chunk(chunk);
1367 }
1368 model.as_fake().end_last_completion_stream();
1369 })
1370 .detach();
1371 }
1372
1373 async fn init_test(cx: &mut TestAppContext) -> EditAgent {
1374 cx.update(settings::init);
1375 cx.update(Project::init_settings);
1376 let project = Project::test(FakeFs::new(cx.executor()), [], cx).await;
1377 let model = Arc::new(FakeLanguageModel::default());
1378 let action_log = cx.new(|_| ActionLog::new(project.clone()));
1379 EditAgent::new(
1380 model,
1381 project,
1382 action_log,
1383 Templates::new(),
1384 EditFormat::XmlTags,
1385 )
1386 }
1387
1388 #[gpui::test(iterations = 10)]
1389 async fn test_non_unique_text_error(cx: &mut TestAppContext, mut rng: StdRng) {
1390 let agent = init_test(cx).await;
1391 let original_text = indoc! {"
1392 function foo() {
1393 return 42;
1394 }
1395
1396 function bar() {
1397 return 42;
1398 }
1399
1400 function baz() {
1401 return 42;
1402 }
1403 "};
1404 let buffer = cx.new(|cx| Buffer::local(original_text, cx));
1405 let (apply, mut events) = agent.edit(
1406 buffer.clone(),
1407 String::new(),
1408 &LanguageModelRequest::default(),
1409 &mut cx.to_async(),
1410 );
1411 cx.run_until_parked();
1412
1413 // When <old_text> matches text in more than one place
1414 simulate_llm_output(
1415 &agent,
1416 indoc! {"
1417 <old_text>
1418 return 42;
1419 }
1420 </old_text>
1421 <new_text>
1422 return 100;
1423 }
1424 </new_text>
1425 "},
1426 &mut rng,
1427 cx,
1428 );
1429 apply.await.unwrap();
1430
1431 // Then the text should remain unchanged
1432 let result_text = buffer.read_with(cx, |buffer, _| buffer.snapshot().text());
1433 assert_eq!(
1434 result_text,
1435 indoc! {"
1436 function foo() {
1437 return 42;
1438 }
1439
1440 function bar() {
1441 return 42;
1442 }
1443
1444 function baz() {
1445 return 42;
1446 }
1447 "},
1448 "Text should remain unchanged when there are multiple matches"
1449 );
1450
1451 // And AmbiguousEditRange even should be emitted
1452 let events = drain_events(&mut events);
1453 let ambiguous_ranges = vec![2..3, 6..7, 10..11];
1454 assert!(
1455 events.contains(&EditAgentOutputEvent::AmbiguousEditRange(ambiguous_ranges)),
1456 "Should emit AmbiguousEditRange for non-unique text"
1457 );
1458 }
1459
1460 fn drain_events(
1461 stream: &mut UnboundedReceiver<EditAgentOutputEvent>,
1462 ) -> Vec<EditAgentOutputEvent> {
1463 let mut events = Vec::new();
1464 while let Ok(Some(event)) = stream.try_next() {
1465 events.push(event);
1466 }
1467 events
1468 }
1469}