Revert terminal memory leak fixes (#23636)

Kirill Bulatov created

New mechanism had introduced the following regressions:

* Windows tasks are not registering child exit properly, sometimes
getting stuck in dirty state (even with
https://github.com/zed-industries/zed/pull/23631):


https://github.com/user-attachments/assets/6d406f17-aa76-4012-9c3b-be72d6d5beae

* Overall, the terminal editing started to feel more sluggish, esp. in
regards to deletions (ctrl-w and backspace), tested on macOS:


https://github.com/user-attachments/assets/3a69fe2e-e394-45e8-8f51-0f5ac396cb24


Release Notes:

- N/A

Change summary

crates/repl/src/outputs/plain.rs |  16 ++
crates/terminal/src/terminal.rs  | 184 ++++++++++-----------------------
2 files changed, 66 insertions(+), 134 deletions(-)

Detailed changes

crates/repl/src/outputs/plain.rs 🔗

@@ -16,7 +16,6 @@
 //!
 
 use alacritty_terminal::{
-    event::VoidListener,
     grid::Dimensions as _,
     index::{Column, Line, Point},
     term::Config,
@@ -25,6 +24,8 @@ use alacritty_terminal::{
 use gpui::{canvas, size, ClipboardItem, FontStyle, Model, TextStyle, WhiteSpace};
 use language::Buffer;
 use settings::Settings as _;
+use std::mem;
+use terminal::ZedListener;
 use terminal_view::terminal_element::TerminalElement;
 use theme::ThemeSettings;
 use ui::{prelude::*, IntoElement};
@@ -49,7 +50,7 @@ pub struct TerminalOutput {
     /// ANSI escape sequence processor for parsing input text.
     parser: Processor,
     /// Alacritty terminal instance that manages the terminal state and content.
-    handler: alacritty_terminal::Term<VoidListener>,
+    handler: alacritty_terminal::Term<ZedListener>,
 }
 
 const DEFAULT_NUM_LINES: usize = 32;
@@ -123,9 +124,14 @@ impl TerminalOutput {
     /// and sets up the necessary components for handling terminal events and rendering.
     ///
     pub fn new(cx: &mut WindowContext) -> Self {
-        let term =
-            alacritty_terminal::Term::new(Config::default(), &terminal_size(cx), VoidListener);
-
+        let (events_tx, events_rx) = futures::channel::mpsc::unbounded();
+        let term = alacritty_terminal::Term::new(
+            Config::default(),
+            &terminal_size(cx),
+            terminal::ZedListener(events_tx.clone()),
+        );
+
+        mem::forget(events_rx);
         Self {
             parser: Processor::new(),
             handler: term,

crates/terminal/src/terminal.rs 🔗

@@ -28,7 +28,7 @@ use anyhow::{bail, Result};
 
 use futures::{
     channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender},
-    FutureExt, SinkExt,
+    FutureExt,
 };
 
 use mappings::mouse::{
@@ -53,7 +53,7 @@ use std::{
     ops::{Deref, Index, RangeInclusive},
     path::PathBuf,
     sync::Arc,
-    time::{Duration, Instant},
+    time::Duration,
 };
 use thiserror::Error;
 
@@ -482,59 +482,61 @@ impl TerminalBuilder {
         })
     }
 
-    pub fn subscribe(self, cx: &ModelContext<Terminal>) -> Terminal {
-        // Accumulate the effects of events on a background thread in order to keep up with the
-        // events from alacritty even when it is emitting events rapidly.
-        let (mut accumulated_events_tx, mut accumulated_events_rx) = unbounded();
-        let mut events_rx = self.events_rx;
-        let background_executor = cx.background_executor().clone();
-        cx.background_executor()
-            .spawn(async move {
-                while let Some(event) = events_rx.next().await {
-                    // Process the first event immediately to reduce latency
-                    accumulated_events_tx
-                        .send(EventOrAccumulator::Event(event))
-                        .await?;
-                    'outer: loop {
-                        let start_time = Instant::now();
-                        let mut timer = background_executor.timer(Duration::from_millis(4)).fuse();
-                        let mut event_accumulator = EventAccumulator::new();
-                        loop {
-                            futures::select_biased! {
-                                // Events are no longer coming in at a high rate, so go back to just
-                                // awaiting the next event.
-                                _ = timer => break 'outer,
-                                event = events_rx.next() => {
-                                    let Some(event) = event else {
-                                        break;
-                                    };
-                                    event_accumulator.add(event);
-                                    if event_accumulator.events.len() > 100 {
-                                        break;
+    pub fn subscribe(mut self, cx: &ModelContext<Terminal>) -> Terminal {
+        //Event loop
+        cx.spawn(|terminal, mut cx| async move {
+            while let Some(event) = self.events_rx.next().await {
+                terminal.update(&mut cx, |terminal, cx| {
+                    //Process the first event immediately for lowered latency
+                    terminal.process_event(&event, cx);
+                })?;
+
+                'outer: loop {
+                    let mut events = Vec::new();
+                    let mut timer = cx
+                        .background_executor()
+                        .timer(Duration::from_millis(4))
+                        .fuse();
+                    let mut wakeup = false;
+                    loop {
+                        futures::select_biased! {
+                            _ = timer => break,
+                            event = self.events_rx.next() => {
+                                if let Some(event) = event {
+                                    if matches!(event, AlacTermEvent::Wakeup) {
+                                        wakeup = true;
+                                    } else {
+                                        events.push(event);
                                     }
-                                    let elapsed = Instant::now().duration_since(start_time);
-                                    if elapsed > Duration::from_millis(20) {
+
+                                    if events.len() > 100 {
                                         break;
                                     }
-                                },
-                            }
+                                } else {
+                                    break;
+                                }
+                            },
                         }
-                        accumulated_events_tx
-                            .send(EventOrAccumulator::Accumulator(event_accumulator))
-                            .await?;
                     }
-                }
-                anyhow::Ok(())
-            })
-            .detach();
 
-        // On the foreground thread, process the accumulated effects of events.
-        cx.spawn(|terminal, mut cx| async move {
-            while let Some(event_or_accumulator) = accumulated_events_rx.next().await {
-                terminal.update(&mut cx, |terminal, cx| {
-                    event_or_accumulator.process_events(terminal, cx)
-                })?;
+                    if events.is_empty() && !wakeup {
+                        smol::future::yield_now().await;
+                        break 'outer;
+                    }
+
+                    terminal.update(&mut cx, |this, cx| {
+                        if wakeup {
+                            this.process_event(&AlacTermEvent::Wakeup, cx);
+                        }
+
+                        for event in events {
+                            this.process_event(&event, cx);
+                        }
+                    })?;
+                    smol::future::yield_now().await;
+                }
             }
+
             anyhow::Ok(())
         })
         .detach();
@@ -543,83 +545,6 @@ impl TerminalBuilder {
     }
 }
 
-enum EventOrAccumulator {
-    Event(AlacTermEvent),
-    Accumulator(EventAccumulator),
-}
-
-impl EventOrAccumulator {
-    fn process_events(self, terminal: &mut Terminal, cx: &mut ModelContext<Terminal>) {
-        match self {
-            EventOrAccumulator::Event(event) => terminal.process_event(event, cx),
-            EventOrAccumulator::Accumulator(accumulator) => {
-                accumulator.process_events(terminal, cx)
-            }
-        }
-    }
-}
-
-struct EventAccumulator {
-    wakeup: bool,
-    cursor_blinking_changed: bool,
-    bell: bool,
-    title: Option<String>,
-    /// Events that can't be deduplicated.
-    events: Vec<AlacTermEvent>,
-}
-
-impl EventAccumulator {
-    fn new() -> Self {
-        EventAccumulator {
-            wakeup: false,
-            cursor_blinking_changed: false,
-            bell: false,
-            title: None,
-            events: Vec::new(),
-        }
-    }
-
-    fn add(&mut self, event: AlacTermEvent) {
-        match event {
-            // Events that can have their effects deduplicated.
-            AlacTermEvent::Title(title) => self.title = Some(title),
-            AlacTermEvent::ResetTitle => self.title = Some(String::new()),
-            AlacTermEvent::CursorBlinkingChange => self.cursor_blinking_changed = true,
-            AlacTermEvent::Wakeup => self.wakeup = true,
-            AlacTermEvent::Bell => self.bell = true,
-            // Events that have handlers involving writing text to the terminal or interacting with
-            // clipboard, and so must be kept in order.
-            AlacTermEvent::ClipboardStore(_, _) => self.events.push(event),
-            AlacTermEvent::ClipboardLoad(_, _) => self.events.push(event),
-            AlacTermEvent::PtyWrite(_) => self.events.push(event),
-            AlacTermEvent::TextAreaSizeRequest(_) => self.events.push(event),
-            AlacTermEvent::ColorRequest(_, _) => self.events.push(event),
-            AlacTermEvent::Exit => self.events.push(event),
-            AlacTermEvent::ChildExit(_) => self.events.push(event),
-            // Handled in render so no need to handle here.
-            AlacTermEvent::MouseCursorDirty => {}
-        }
-    }
-
-    fn process_events(self, terminal: &mut Terminal, cx: &mut ModelContext<Terminal>) {
-        if self.wakeup {
-            terminal.process_event(AlacTermEvent::Wakeup, cx);
-        }
-        if self.cursor_blinking_changed {
-            terminal.process_event(AlacTermEvent::CursorBlinkingChange, cx);
-        }
-        if self.bell {
-            terminal.process_event(AlacTermEvent::Bell, cx);
-        }
-        if let Some(title) = self.title {
-            terminal.process_event(AlacTermEvent::Title(title), cx);
-        }
-        for event in self.events {
-            terminal.process_event(event, cx);
-        }
-    }
-}
-
 #[derive(Debug, Clone, Deserialize, Serialize)]
 pub struct IndexedCell {
     pub point: AlacPoint,
@@ -749,7 +674,7 @@ impl TaskStatus {
 }
 
 impl Terminal {
-    fn process_event(&mut self, event: AlacTermEvent, cx: &mut ModelContext<Self>) {
+    fn process_event(&mut self, event: &AlacTermEvent, cx: &mut ModelContext<Self>) {
         match event {
             AlacTermEvent::Title(title) => {
                 self.breadcrumb_text = title.to_string();
@@ -803,12 +728,13 @@ impl Terminal {
                 // Instead of locking, we could store the colors in `self.last_content`. But then
                 // we might respond with out of date value if a "set color" sequence is immediately
                 // followed by a color request sequence.
-                let color = self.term.lock().colors()[index]
-                    .unwrap_or_else(|| to_alac_rgb(get_color_at_index(index, cx.theme().as_ref())));
+                let color = self.term.lock().colors()[*index].unwrap_or_else(|| {
+                    to_alac_rgb(get_color_at_index(*index, cx.theme().as_ref()))
+                });
                 self.write_to_pty(format(color));
             }
             AlacTermEvent::ChildExit(error_code) => {
-                self.register_task_finished(Some(error_code), cx);
+                self.register_task_finished(Some(*error_code), cx);
             }
         }
     }