@@ -1,10 +1,7 @@
use std::{
collections::{HashSet, VecDeque},
fmt::Display,
- sync::{
- Arc,
- atomic::{AtomicBool, Ordering},
- },
+ sync::Arc,
};
use agent_client_protocol::schema as acp;
@@ -162,30 +159,17 @@ struct RawStreamLine {
/// can publish transport and stderr lines without knowing anything about
/// the logs panel's channel.
///
-/// The tap carries a shared `enabled` flag that the registry flips on when
-/// the first observer subscribes. Until then, `emit_*` methods are
-/// effectively free: they check an atomic and return. This keeps the
-/// logs panel's memory footprint opt-in — if no one ever opens it, the
-/// transport never allocates a line or pushes a channel item.
+/// Every line is buffered into the registry's ring, so opening the ACP logs
+/// panel after the fact still shows history. The steady-state cost is
+/// negligible compared to the JSON-RPC serialization that already happened
+/// to produce the line.
#[derive(Clone)]
pub struct AcpLogTap {
- enabled: Arc<AtomicBool>,
sender: smol::channel::Sender<RawStreamLine>,
}
impl AcpLogTap {
- fn is_enabled(&self) -> bool {
- self.enabled.load(Ordering::Relaxed)
- }
-
- fn enable(&self) {
- self.enabled.store(true, Ordering::Relaxed);
- }
-
fn emit(&self, direction: StreamMessageDirection, line: &str) {
- if !self.is_enabled() {
- return;
- }
self.sender
.try_send(RawStreamLine {
direction,
@@ -225,9 +209,6 @@ pub struct AcpConnectionRegistry {
/// When a new connection is set, this is cleared.
backlog: VecDeque<StreamMessage>,
subscribers: Vec<smol::channel::Sender<StreamMessage>>,
- /// The tap handed to the currently active connection, so the registry
- /// can flip its `enabled` flag the first time someone subscribes.
- active_tap: Option<AcpLogTap>,
_broadcast_task: Option<Task<()>>,
}
@@ -245,26 +226,21 @@ impl AcpConnectionRegistry {
/// Register a new active connection and return an [`AcpLogTap`] that
/// the connection should hand to its transport + stderr readers.
///
- /// The tap starts out disabled: transport lines are dropped cheaply
- /// until someone subscribes via [`Self::subscribe`], at which point
- /// the tap is flipped on and subsequent lines are broadcast to all
- /// current and future subscribers.
+ /// The tap begins capturing immediately so that opening the ACP logs
+ /// panel after something has already gone wrong still shows the
+ /// leading history (up to [`MAX_BACKLOG_MESSAGES`]).
pub fn set_active_connection(
&mut self,
agent_id: AgentId,
cx: &mut Context<Self>,
) -> AcpLogTap {
let (sender, raw_rx) = smol::channel::unbounded::<RawStreamLine>();
- let tap = AcpLogTap {
- enabled: Arc::new(AtomicBool::new(false)),
- sender,
- };
+ let tap = AcpLogTap { sender };
self.active_agent_id = Some(agent_id);
self.generation += 1;
self.backlog.clear();
self.subscribers.clear();
- self.active_tap = Some(tap.clone());
self._broadcast_task = Some(cx.spawn(async move |this, cx| {
while let Ok(raw) = raw_rx.recv().await {
@@ -292,7 +268,6 @@ impl AcpConnectionRegistry {
this.update(cx, |this, cx| {
this.active_agent_id = None;
this.subscribers.clear();
- this.active_tap = None;
cx.notify();
})
.log_err();
@@ -317,15 +292,7 @@ impl AcpConnectionRegistry {
/// a receiver for new messages. The caller is responsible for flushing the
/// backlog into its local state before draining the receiver, so that no
/// messages are dropped between the snapshot and live subscription.
- ///
- /// The first subscription enables the connection's log tap; prior
- /// messages are therefore not available. This is intentional: the tap
- /// is opt-in so that the default case (no one ever opens the ACP logs
- /// panel) performs zero per-message bookkeeping.
pub fn subscribe(&mut self) -> (Vec<StreamMessage>, smol::channel::Receiver<StreamMessage>) {
- if let Some(tap) = &self.active_tap {
- tap.enable();
- }
let backlog = self.backlog.iter().cloned().collect();
let (sender, receiver) = smol::channel::unbounded();
self.subscribers.push(sender);