neuralwatt-mcr.ts

  1import type { ExtensionAPI, ExtensionContext } from "@mariozechner/pi-coding-agent";
  2import * as fs from "node:fs";
  3import * as path from "node:path";
  4import * as os from "node:os";
  5import { randomUUID } from "node:crypto";
  6
  7// Bump on any user-facing behaviour change. Surfaced in the extension log so a
  8// session's behaviour can be tied to a specific revision when triaging reports.
  9//   2.0.0 — honest in-flight chip (tools#33 / inference_frontend#3954):
 10//           neutral "working…" label, silent grace window, tightened MCR gating.
 11//   2.1.0 — send the extension version on the wire as X-NW-MCR-Ext-Version so
 12//           the gateway can log which client revision served a request (server
 13//           logs previously had no way to tell a user's extension version).
 14const EXTENSION_VERSION = "2.1.0";
 15
 16const MCR_ANCHOR_USER_MESSAGES = 3;
 17
 18const LOG_FILE = path.join(
 19  os.homedir(),
 20  ".pi",
 21  "agent",
 22  "extensions",
 23  "neuralwatt-mcr.log",
 24);
 25
 26function nwlog(event: string, data: Record<string, unknown> = {}): void {
 27  try {
 28    const line =
 29      JSON.stringify({ ts: new Date().toISOString(), event, ...data }) + "\n";
 30    fs.appendFileSync(LOG_FILE, line);
 31  } catch {
 32    // never break the extension on logging failure
 33  }
 34}
 35
 36interface MCRMetadata {
 37  session_fp: string;
 38  stored_through: number;
 39  safe_drop_before: number;
 40}
 41
 42interface EnergyData {
 43  energy_joules: number;
 44  mcr?: {
 45    session_turns: number;
 46    context_tokens: number;
 47    compaction_triggered: boolean;
 48    apc_hit_rate?: number;
 49    mcr_compacted_tokens?: number;
 50    mcr_original_tokens?: number;
 51  };
 52}
 53
 54interface SessionState {
 55  sessionFp: string | null;
 56  safeDropBefore: number;
 57  storedThrough: number;
 58  totalEnergyJoules: number;
 59  sessionTurns: number;
 60  contextTokens: number;
 61  lastMcrMeta: MCRMetadata | null;
 62  lastEnergy: EnergyData | null;
 63  // In-flight request UX. When an MCR request is sent we record the wall-clock
 64  // send time so the chip can surface a neutral "working…" indicator on long
 65  // waits, and cleared on the first ``message_update`` / ``message_end``.
 66  //
 67  // HONESTY NOTE (tools#33 / inference_frontend#3954): this is a wall-clock
 68  // proxy, NOT a real compaction signal. The extension cannot observe whether
 69  // the gateway is actually compacting — see ``markRequestSent`` for why Pi's
 70  // API doesn't surface the gateway's ``mcr-status`` SSE frames. So the chip
 71  // must NOT claim "optimizing context"; it can only honestly say a request is
 72  // in flight. We also stay silent for the first few seconds so ordinary turns
 73  // (the ~96% with no compaction) show nothing at all.
 74  inFlightSince: number | null;
 75  inFlightTickerHandle: NodeJS.Timeout | null;
 76}
 77
 78const state: SessionState = {
 79  sessionFp: null,
 80  safeDropBefore: 0,
 81  storedThrough: 0,
 82  totalEnergyJoules: 0,
 83  sessionTurns: 0,
 84  contextTokens: 0,
 85  lastMcrMeta: null,
 86  lastEnergy: null,
 87  inFlightSince: null,
 88  inFlightTickerHandle: null,
 89};
 90
 91// How often to refresh the chip while the in-flight indicator is showing, so
 92// the elapsed counter advances visibly. 500ms is fast enough to feel live but
 93// well below the refresh budget of Pi's footer.
 94const IN_FLIGHT_TICK_MS = 500;
 95
 96// Honesty grace window (tools#33 / inference_frontend#3954): suppress the
 97// in-flight indicator for the first few seconds of every request. Large MCR
 98// prompts (100k+ tokens) have a naturally long prefill/TTFT — 10-60s — on
 99// EVERY turn, with no compaction happening on the ~96% that don't compact.
100// The old chip labelled that ordinary latency "optimizing context…" for the
101// whole window, which read as "MCR is making every prompt slow" and drove
102// churn. We can't tell prefill from compaction (no SSE phase signal reaches
103// the extension), so the only honest move is: say nothing until the wait is
104// long enough that the user genuinely wants reassurance the model isn't hung,
105// and even then use a neutral label that doesn't assert MCR is doing work.
106const IN_FLIGHT_GRACE_MS = 6000;
107
108// Recognise ONLY the MCR-backed aliases. The MCR pipeline (server-side
109// compaction + 1M virtual context + the X-MCR-* response protocol) runs only
110// for the `neuralwatt/…-long` aliases; the base-model and fast/flex IDs route
111// straight to the provider with no compaction.
112//
113// tools#33: the earlier predicate also matched `zai-org/`, `moonshotai/`,
114// `glm-5`, and `kimi-k2`, so every fast/flex alias and direct base-model call
115// lit the in-flight chip and ran the context-drop / fp handlers — none of which
116// apply off-MCR. Nico reported "optimizing context" on non-long models. The
117// client always selects the alias (never the forwarded base name), so matching
118// the alias shape is sufficient and correct. This narrows every call site
119// (chip gating, context-drop, fp-set, compaction-suppression) to MCR-only,
120// which is the intended behaviour for all of them.
121function isMCRModel(modelId: string): boolean {
122  return modelId.includes("neuralwatt/") || modelId.endsWith("-long");
123}
124
125function extractMCRFromHeaders(
126  headers: Record<string, string>,
127): MCRMetadata | null {
128  const fp = headers["x-mcr-session-fp"];
129  if (!fp) return null;
130  return {
131    session_fp: fp,
132    stored_through: parseInt(headers["x-mcr-stored-through"] || "0", 10),
133    safe_drop_before: parseInt(
134      headers["x-mcr-safe-drop-before"] || "0",
135      10,
136    ),
137  };
138}
139
140// Parse a header value as an integer if present, else null. We keep null
141// distinct from 0 because:
142//   * absent header  -> gateway did not run the M1.b ref-recovery code path
143//     (older deploy, non-MCR request, or path bailed before emitting)
144//   * 0              -> ref-recovery ran but had nothing to surface
145// This distinction is the whole point of issue #3371 follow-up.
146function parseOptionalIntHeader(
147  headers: Record<string, string>,
148  name: string,
149): number | null {
150  const raw = headers[name];
151  if (raw === undefined || raw === null || raw === "") return null;
152  const n = parseInt(raw, 10);
153  return Number.isFinite(n) ? n : null;
154}
155
156function extractMCRFromBody(
157  body: Record<string, unknown>,
158): MCRMetadata | null {
159  const mcr = body.mcr as Record<string, unknown> | undefined;
160  if (!mcr || typeof mcr !== "object" || typeof mcr.session_fp !== "string")
161    return null;
162  return {
163    session_fp: mcr.session_fp as string,
164    stored_through:
165      typeof mcr.stored_through === "number" ? mcr.stored_through : 0,
166    safe_drop_before:
167      typeof mcr.safe_drop_before === "number" ? mcr.safe_drop_before : 0,
168  };
169}
170
171function extractEnergyFromBody(
172  body: Record<string, unknown>,
173): EnergyData | null {
174  const energy = body.energy as Record<string, unknown> | undefined;
175  if (!energy || typeof energy !== "object") return null;
176  const result: EnergyData = {
177    energy_joules:
178      typeof energy.energy_joules === "number" ? energy.energy_joules : 0,
179  };
180  if (energy.mcr && typeof energy.mcr === "object") {
181    const m = energy.mcr as Record<string, unknown>;
182    result.mcr = {
183      session_turns:
184        typeof m.session_turns === "number" ? m.session_turns : 0,
185      context_tokens:
186        typeof m.context_tokens === "number" ? m.context_tokens : 0,
187      compaction_triggered:
188        typeof m.compaction_triggered === "boolean"
189          ? m.compaction_triggered
190          : false,
191      apc_hit_rate:
192        typeof m.apc_hit_rate === "number" ? m.apc_hit_rate : undefined,
193      mcr_compacted_tokens:
194        typeof m.mcr_compacted_tokens === "number"
195          ? m.mcr_compacted_tokens
196          : undefined,
197      mcr_original_tokens:
198        typeof m.mcr_original_tokens === "number"
199          ? m.mcr_original_tokens
200          : undefined,
201    };
202  }
203  return result;
204}
205
206function formatEnergy(joules: number): string {
207  if (joules < 1) return `${(joules * 1000).toFixed(0)}mJ`;
208  if (joules < 1000) return `${joules.toFixed(1)}J`;
209  return `${(joules / 1000).toFixed(2)}kJ`;
210}
211
212// Render an in-flight elapsed-time stamp for the chip. Short formats (<10s →
213// "1.4s", >=10s → "12s", >=60s → "1m 5s") keep the chip from growing wide
214// enough to push other footer widgets off-screen.
215function formatElapsed(ms: number): string {
216  if (ms < 10000) return `${(ms / 1000).toFixed(1)}s`;
217  const seconds = Math.floor(ms / 1000);
218  if (seconds < 60) return `${seconds}s`;
219  const minutes = Math.floor(seconds / 60);
220  return `${minutes}m ${seconds % 60}s`;
221}
222
223// Extract the role/type marker from an entry. Pi's outbound HTTP payload
224// uses OpenAI-shape ``role`` (``user``/``assistant``/``tool``/``system``);
225// Pi's internal session-log records sometimes serialize the same field as
226// ``type``. Read both so this extension works regardless of which shape
227// the agent-runtime hands us via the ``context`` hook event.
228function entryRole(entry: { role?: string; type?: string }): string | undefined {
229  return entry.role ?? entry.type;
230}
231
232// Returns the FULL message index of the Nth user message (matching the
233// indexing space of the server's `safe_drop_before`, which counts every
234// message — user/assistant/tool/system — in send order). The earlier
235// version returned a user+assistant-subset index, which mixed index
236// spaces with `safe_drop_before` in `computeDropRange` and caused
237// `context_drop` to silently nuke the most-recent user prompts when the
238// upper bound exceeded the user+assistant subset size.
239function findAnchorFloor(
240  entries: Array<{ role?: string; type?: string }>,
241  nAnchors: number,
242): number {
243  let userCount = 0;
244  for (let i = 0; i < entries.length; i++) {
245    const role = entryRole(entries[i]);
246    if (role === "user") {
247      userCount++;
248      if (userCount === nAnchors) return i;
249    }
250  }
251  return -1;
252}
253
254// Server-side validation for X-NW-Conversation-ID (mirrors
255// services/mcr_v3_session.py::validate_client_conversation_id):
256//
257//   * non-empty, ≤ 256 chars
258//   * all code points printable (no control / whitespace beyond plain space)
259//
260// We sanitize here so a malformed Pi session id can never bounce off the
261// server as HTTP 400 — the worst case is we fall back to the in-process UUID.
262const MAX_CONVERSATION_ID_LEN = 256;
263
264function isWellFormedConversationId(value: string): boolean {
265  if (!value || value.length === 0 || value.length > MAX_CONVERSATION_ID_LEN) {
266    return false;
267  }
268  // Reject any control character (matches server's `not c.isprintable()` rule).
269  // We allow plain ASCII space (0x20) and everything ≥ 0x20 except 0x7F (DEL).
270  for (let i = 0; i < value.length; i++) {
271    const code = value.charCodeAt(i);
272    if (code < 0x20 || code === 0x7f) return false;
273  }
274  return true;
275}
276
277// Per-process fallback id. Generated lazily so it stays stable across
278// auto-compact within one `pi` invocation, and differs across invocations.
279let uuidFallback: string | null = null;
280function getUuidFallback(): string {
281  if (!uuidFallback) {
282    uuidFallback = randomUUID();
283  }
284  return uuidFallback;
285}
286
287type ConversationIdSource = "pi-session" | "uuid-fallback";
288
289function resolveConversationId(
290  ctx: ExtensionContext,
291): { id: string; source: ConversationIdSource } {
292  // Preferred source: Pi's own session id. Stable across auto-compact in a
293  // single `pi` session, distinct across sessions, naturally string-form.
294  try {
295    const piSessionId = ctx.sessionManager?.getSessionId?.();
296    if (
297      typeof piSessionId === "string" &&
298      isWellFormedConversationId(piSessionId)
299    ) {
300      return { id: piSessionId, source: "pi-session" };
301    }
302  } catch {
303    // fall through to uuid fallback
304  }
305  return { id: getUuidFallback(), source: "uuid-fallback" };
306}
307
308function computeDropRange(
309  entries: Array<{ role?: string; type?: string }>,
310  safeDropBefore: number,
311): [number, number] {
312  if (safeDropBefore <= 0) return [0, 0];
313  const anchorIdx = findAnchorFloor(entries, MCR_ANCHOR_USER_MESSAGES);
314  if (anchorIdx < 0) return [0, 0];
315  const dropStart = anchorIdx + 1;
316  const dropEnd = safeDropBefore;
317  if (dropEnd <= dropStart) return [0, 0];
318  return [dropStart, dropEnd];
319}
320
321export default function (pi: ExtensionAPI) {
322  const MCR_STATUS_KEY = "nw-mcr";
323  const ENERGY_STATUS_KEY = "nw-energy";
324
325  // ── Outbound header wiring (X-NW-Conversation-ID, X-NW-MCR-Ext-Version) ──
326  // pi-coding-agent's `before_provider_request` is a *body* hook — the
327  // earlier `payload.headers[...]` mutation reached extension memory only,
328  // never the HTTP wire. The documented per-request header path is
329  // `pi.registerProvider({ headers })`, whose values are env-var NAMES that
330  // the SDK re-reads from `process.env` on every stream
331  // (dist/core/resolve-config-value.js). Net: real HTTP headers, no body
332  // touch, no APC impact. Both headers below ride this same mechanism.
333  //
334  // Boot order: we seed the env var with a UUID at extension load so any
335  // request fired before the first `before_provider_request` tick still
336  // carries *some* id. The hook upgrades it to Pi's stable per-invocation
337  // session id (see resolveConversationId). Subsequent requests in the
338  // same `pi` invocation reuse the upgraded value — invocation-stable
339  // session_fp by construction.
340  const CONV_ID_ENV = "X_NW_CONVERSATION_ID";
341  if (!process.env[CONV_ID_ENV]) {
342    process.env[CONV_ID_ENV] = getUuidFallback();
343  }
344  // X-NW-MCR-Ext-Version (2.1.0): surface the client extension
345  // version on the wire so the gateway can log which revision served a
346  // request — server logs previously had no way to tell a user's version.
347  // Unlike the conversation id, the version is static for the life of the
348  // process (it never changes at runtime), so we seed it once at load and
349  // never touch it again — no upgrade-on-hook logic needed.
350  const EXT_VERSION_ENV = "X_NW_MCR_EXT_VERSION";
351  process.env[EXT_VERSION_ENV] = EXTENSION_VERSION;
352  // `apiKey` mirrors the env-var name from ~/.pi/agent/models.json so the
353  // partial config doesn't shadow the existing auth. `storeProviderRequestConfig`
354  // doesn't merge with the models.json-derived entry (different map), so we
355  // have to re-state any field we need; only apiKey here, since baseUrl/api
356  // flow through via the override-only branch of applyProviderConfig.
357  pi.registerProvider("neuralwatt", {
358    apiKey: "NEURALWATT_API_KEY",
359    headers: {
360      "X-NW-Conversation-ID": CONV_ID_ENV,
361      "X-NW-MCR-Ext-Version": EXT_VERSION_ENV,
362    },
363  });
364
365  function updateStatusBar(ctx: ExtensionContext) {
366    // In-flight indicator (tools#33 / inference_frontend#3954). HONESTY RULES:
367    //
368    //  * The extension cannot observe whether the gateway is compacting (Pi
369    //    doesn't surface the ``mcr-status`` SSE frames — see markRequestSent).
370    //    So we NEVER claim "optimizing context"; we only ever say a request is
371    //    in flight ("working…").
372    //  * We stay completely silent for the first IN_FLIGHT_GRACE_MS so normal
373    //    turns — including the long-but-uncompacted prefill that dominates real
374    //    usage — show nothing. Only a genuinely long wait surfaces the neutral
375    //    reassurance that the model hasn't hung.
376    //  * Once real model output starts, ``markStreamProducing`` clears
377    //    ``inFlightSince`` and the chip reverts to the standard MCR view.
378    const inFlightElapsedMs =
379      state.inFlightSince !== null ? Date.now() - state.inFlightSince : 0;
380    if (
381      state.inFlightSince !== null &&
382      inFlightElapsedMs >= IN_FLIGHT_GRACE_MS
383    ) {
384      const fpPrefix = state.sessionFp
385        ? `MCR ${state.sessionFp.slice(0, 8)} | `
386        : "MCR | ";
387      ctx.ui.setStatus(
388        MCR_STATUS_KEY,
389        `${fpPrefix}working… ${formatElapsed(inFlightElapsedMs)}`,
390      );
391    } else if (state.sessionFp) {
392      const parts: string[] = [`MCR ${state.sessionFp.slice(0, 8)}`];
393      if (state.safeDropBefore > 0) {
394        parts.push(`drop<${state.safeDropBefore}`);
395      }
396      ctx.ui.setStatus(MCR_STATUS_KEY, parts.join(" | "));
397    } else {
398      ctx.ui.setStatus(MCR_STATUS_KEY, "");
399    }
400
401    if (state.totalEnergyJoules > 0) {
402      const parts: string[] = [`${formatEnergy(state.totalEnergyJoules)}`];
403      if (state.lastEnergy?.mcr) {
404        const m = state.lastEnergy.mcr;
405        if (m.apc_hit_rate !== undefined) {
406          parts.push(`APC ${(m.apc_hit_rate * 100).toFixed(0)}%`);
407        }
408        if (m.mcr_compacted_tokens && m.mcr_original_tokens) {
409          const ratio = m.mcr_compacted_tokens / m.mcr_original_tokens;
410          parts.push(`compact ${(ratio * 100).toFixed(0)}%`);
411        }
412      }
413      ctx.ui.setStatus(ENERGY_STATUS_KEY, parts.join(" | "));
414    } else {
415      ctx.ui.setStatus(ENERGY_STATUS_KEY, "");
416    }
417  }
418
419  // Lifecycle helpers for the in-flight indicator. We bracket the wait window
420  // with ``markRequestSent`` (on before_provider_request) and
421  // ``markStreamProducing`` (on the first message_update / message_end that
422  // carries real model output). The elapsed counter advances via a
423  // ``setInterval`` that re-calls ``updateStatusBar`` every ~0.5s.
424  //
425  // ── WHY THIS IS A NEUTRAL "working…" PROXY, NOT A REAL MCR PHASE SIGNAL ──
426  //
427  // The gateway (inference_frontend #3916) emits the ground truth as
428  // ``event: mcr-status`` SSE frames carrying {phase: compacting | warming |
429  // idle, elapsed_ms}. The ideal chip would show "optimizing context…" ONLY
430  // while a ``compacting`` phase is live. That is NOT achievable from a Pi
431  // extension on the version this targets (Pi v0.72/0.73), verified against
432  // the published type defs:
433  //
434  //   1. The only stream hook is ``message_update``, whose payload is
435  //      ``assistantMessageEvent: AssistantMessageEvent`` (pi-ai types.d.ts).
436  //      That union is closed to text/thinking/toolcall/start/done/error —
437  //      there is no member that can carry a raw ``mcr-status`` frame.
438  //   2. There is no "raw SSE frame" / "stream event" hook anywhere on
439  //      ``ExtensionAPI`` (pi-coding-agent extensions/types.d.ts): the event
440  //      surface is session/agent/turn/message/tool lifecycle only.
441  //   3. The neuralwatt provider streams through pi-ai's openai-completions
442  //      handler, which consumes the response via the official ``openai`` SDK
443  //      (``client.chat.completions.create(...).withResponse()``). The SDK's
444  //      decoder yields only typed ``chat.completion.chunk`` objects; any SSE
445  //      frame with ``event: mcr-status`` is dropped by the SDK before pi-ai —
446  //      and therefore the extension — can ever see it.
447  //
448  // So the extension genuinely cannot tell prefill from compaction. Claiming
449  // "optimizing context" would be a lie on the ~96% of turns where nothing is
450  // compacted (the churn driver in #3954). The honest behaviour is: neutral
451  // "working…" label, only after a grace window. Path A (real phase-driven
452  // chip) is blocked on an upstream Pi capability: a hook that surfaces raw
453  // provider SSE events (or an OpenAI-compat passthrough for unknown event
454  // types) to extensions. Track that as a Pi feature request; revisit here
455  // once it ships.
456  function markRequestSent(ctx: ExtensionContext) {
457    state.inFlightSince = Date.now();
458    if (state.inFlightTickerHandle !== null) {
459      clearInterval(state.inFlightTickerHandle);
460    }
461    state.inFlightTickerHandle = setInterval(() => {
462      // Defensive: stop ticking if the in-flight flag was cleared
463      // out-of-band (e.g. session_start reset).
464      if (state.inFlightSince === null) {
465        if (state.inFlightTickerHandle !== null) {
466          clearInterval(state.inFlightTickerHandle);
467          state.inFlightTickerHandle = null;
468        }
469        return;
470      }
471      updateStatusBar(ctx);
472    }, IN_FLIGHT_TICK_MS);
473    updateStatusBar(ctx);
474  }
475
476  function markStreamProducing(ctx: ExtensionContext) {
477    if (state.inFlightSince === null) return;
478    state.inFlightSince = null;
479    if (state.inFlightTickerHandle !== null) {
480      clearInterval(state.inFlightTickerHandle);
481      state.inFlightTickerHandle = null;
482    }
483    updateStatusBar(ctx);
484  }
485
486  // Upgrade X_NW_CONVERSATION_ID to Pi's stable session-id as early as
487  // possible — `session_start` fires before any provider request and is the
488  // earliest hook with ctx. Without this, the first request of each fresh
489  // `pi -p` invocation would carry the boot-time UUID (different per
490  // process) and produce a cold-cache session_fp that drags APC averages
491  // down on `--continue` chains.
492  pi.on("session_start", async (_event, ctx) => {
493    const { id: conversationId } = resolveConversationId(ctx);
494    process.env[CONV_ID_ENV] = conversationId;
495  });
496
497  pi.on("after_provider_response", async (event, ctx) => {
498    const modelId = ctx.model?.id || "";
499    if (!isMCRModel(modelId)) return;
500
501    const headers = event.headers as Record<string, string>;
502    const mcrFromHeaders = extractMCRFromHeaders(headers);
503
504    nwlog("after_provider_response", {
505      model: modelId,
506      header_fp: headers["x-mcr-session-fp"] ?? null,
507      header_safe_drop_before: headers["x-mcr-safe-drop-before"] ?? null,
508      header_stored_through: headers["x-mcr-stored-through"] ?? null,
509      parsed: mcrFromHeaders,
510    });
511
512    // M1.b ref-recovery observability (issue #3371 follow-up). The
513    // gateway's recent_ref_expansion path runs on the gateway->backend
514    // payload, which is invisible to Pi — without these headers in the
515    // log we can't tell "ref-recovery ran with no refs available" apart
516    // from "ref-recovery never ran". Counts only, no preview/sha values.
517    // Server-side emission: API_Gateway/app/services/mcr_anthropic_native_proxy.py::_add_mcr_refs_headers
518    const refsRecovered = parseOptionalIntHeader(headers, "x-mcr-refs-recovered");
519    const refsInForward = parseOptionalIntHeader(headers, "x-mcr-refs-in-forward");
520    const refsSkippedBudget = parseOptionalIntHeader(
521      headers,
522      "x-mcr-refs-skipped-budget",
523    );
524    const refsSkippedMissing = parseOptionalIntHeader(
525      headers,
526      "x-mcr-refs-skipped-missing",
527    );
528    const recoveryTokensAdded = parseOptionalIntHeader(
529      headers,
530      "x-mcr-recovery-tokens-added",
531    );
532    const manifestEntries = parseOptionalIntHeader(
533      headers,
534      "x-mcr-manifest-entries",
535    );
536
537    if (
538      refsRecovered !== null ||
539      refsInForward !== null ||
540      refsSkippedBudget !== null ||
541      refsSkippedMissing !== null ||
542      recoveryTokensAdded !== null ||
543      manifestEntries !== null
544    ) {
545      nwlog("mcr_refs", {
546        refs_recovered: refsRecovered,
547        refs_in_forward: refsInForward,
548        refs_skipped_budget: refsSkippedBudget,
549        refs_skipped_missing: refsSkippedMissing,
550        recovery_tokens_added: recoveryTokensAdded,
551        manifest_entries: manifestEntries,
552      });
553    }
554
555    if (mcrFromHeaders) {
556      state.sessionFp = mcrFromHeaders.session_fp;
557      state.safeDropBefore = mcrFromHeaders.safe_drop_before;
558      state.storedThrough = mcrFromHeaders.stored_through;
559      state.lastMcrMeta = mcrFromHeaders;
560    }
561
562    updateStatusBar(ctx);
563  });
564
565  // Clear the in-flight indicator on the first message_update for an MCR
566  // model — at that point real model tokens are flowing, the "is it hung?"
567  // perception is gone, and the chip should revert to the standard
568  // MCR-fingerprint view. The MessageUpdateEvent fires for any assistant
569  // streaming update (text/thinking/toolcall deltas); we don't need to narrow
570  // further since any of these prove the wait is over.
571  pi.on("message_update", async (event, ctx) => {
572    if (event.message.role !== "assistant") return;
573    if (!isMCRModel(ctx.model?.id || "")) return;
574    markStreamProducing(ctx);
575  });
576
577  pi.on("message_end", async (event, ctx) => {
578    if (event.message.role !== "assistant") return;
579    if (!isMCRModel(ctx.model?.id || "")) return;
580
581    // Backstop — if a response was short enough that no message_update ever
582    // fired, message_end is the latest possible chance to clear the indicator
583    // before it gets stale.
584    markStreamProducing(ctx);
585
586    const msg = event.message as Record<string, unknown>;
587
588    const mcrFromBody = extractMCRFromBody(msg);
589    if (mcrFromBody) {
590      nwlog("message_end_mcr_body", { parsed: mcrFromBody });
591      state.sessionFp = mcrFromBody.session_fp;
592      state.safeDropBefore = mcrFromBody.safe_drop_before;
593      state.storedThrough = mcrFromBody.stored_through;
594      state.lastMcrMeta = mcrFromBody;
595    }
596
597    const energy = extractEnergyFromBody(msg);
598    if (energy) {
599      nwlog("message_end_energy", {
600        energy_joules: energy.energy_joules,
601        cumulative_joules: state.totalEnergyJoules + energy.energy_joules,
602        session_turns: energy.mcr?.session_turns,
603        context_tokens: energy.mcr?.context_tokens,
604        compaction_triggered: energy.mcr?.compaction_triggered,
605        apc_hit_rate: energy.mcr?.apc_hit_rate,
606        mcr_compacted_tokens: energy.mcr?.mcr_compacted_tokens,
607        mcr_original_tokens: energy.mcr?.mcr_original_tokens,
608      });
609      state.totalEnergyJoules += energy.energy_joules;
610      state.lastEnergy = energy;
611      pi.appendEntry("neuralwatt-energy", { energy_joules: energy.energy_joules });
612      if (energy.mcr) {
613        state.sessionTurns = energy.mcr.session_turns;
614        state.contextTokens = energy.mcr.context_tokens;
615      }
616    }
617
618    updateStatusBar(ctx);
619  });
620
621  pi.on("context", async (event, ctx) => {
622    const modelId = ctx.model?.id || "";
623    const numMsgs = event.messages.length;
624
625    if (!state.sessionFp) {
626      nwlog("context_skip", {
627        reason: "no_session_fp",
628        model: modelId,
629        num_msgs: numMsgs,
630      });
631      return;
632    }
633    if (state.safeDropBefore <= 0) {
634      nwlog("context_skip", {
635        reason: "safe_drop_before_zero",
636        model: modelId,
637        num_msgs: numMsgs,
638        safe_drop_before: state.safeDropBefore,
639        session_fp: state.sessionFp,
640      });
641      return;
642    }
643    if (!isMCRModel(modelId)) {
644      nwlog("context_skip", {
645        reason: "not_mcr_model",
646        model: modelId,
647        num_msgs: numMsgs,
648      });
649      return;
650    }
651
652    const [dropStart, dropEnd] = computeDropRange(
653      event.messages as Array<{ type: string }>,
654      state.safeDropBefore,
655    );
656
657    if (dropEnd <= dropStart) {
658      nwlog("context_no_drop", {
659        reason: "empty_range",
660        drop_start: dropStart,
661        drop_end: dropEnd,
662        safe_drop_before: state.safeDropBefore,
663        num_msgs: numMsgs,
664        session_fp: state.sessionFp,
665      });
666      return;
667    }
668
669    // Drop messages in the full-index range [dropStart, dropEnd). Both
670    // bounds are in the same indexing space as `event.messages` and as the
671    // server's `safe_drop_before` (every message counted, all roles). The
672    // earlier version maintained a separate user+assistant-subset counter
673    // and compared it against full-index bounds, which silently nuked the
674    // most-recent user prompts (#bug discovered 2026-05-16 — see commit
675    // message for the trace).
676    const clampedEnd = Math.min(dropEnd, event.messages.length);
677    const filtered = event.messages.filter(
678      (_: unknown, i: number) => i < dropStart || i >= clampedEnd,
679    );
680    const droppedCount = numMsgs - filtered.length;
681
682    if (droppedCount === 0) {
683      nwlog("context_no_drop", {
684        reason: "no_indices_matched",
685        drop_start: dropStart,
686        drop_end: clampedEnd,
687        safe_drop_before: state.safeDropBefore,
688        num_msgs: numMsgs,
689        session_fp: state.sessionFp,
690      });
691      return;
692    }
693
694    nwlog("context_drop", {
695      drop_start: dropStart,
696      drop_end: clampedEnd,
697      safe_drop_before: state.safeDropBefore,
698      num_msgs_before: numMsgs,
699      num_msgs_after: filtered.length,
700      dropped: droppedCount,
701      session_fp: state.sessionFp,
702    });
703
704    return { messages: filtered };
705  });
706
707  pi.on("before_provider_request", async (event, ctx) => {
708    if (!isMCRModel(ctx.model?.id || "")) return;
709
710    const payload = event.payload as Record<string, unknown>;
711
712    // Start the in-flight indicator. It stays silent for IN_FLIGHT_GRACE_MS;
713    // only on a genuinely long wait does it surface a neutral
714    // "working… 12s" so the chat UI isn't indistinguishable from a hung
715    // model. It deliberately does NOT claim "optimizing context" — the
716    // extension can't observe whether compaction is happening (see
717    // ``markRequestSent`` for the Pi-API limitation that blocks the real
718    // phase signal).
719    markRequestSent(ctx);
720
721    // Upgrade the X_NW_CONVERSATION_ID env var (seeded with a UUID at
722    // extension load) to Pi's stable per-invocation session id. The SDK
723    // re-reads process.env on every stream, so this propagates to the
724    // next outbound request as the real X-NW-Conversation-ID HTTP header
725    // — no body touch. See the header-wiring block at the top of this fn
726    // and pi-header-surface.md for the full mechanism trace.
727    const { id: conversationId, source: conversationIdSource } =
728      resolveConversationId(ctx);
729    process.env[CONV_ID_ENV] = conversationId;
730    nwlog("conversation_id_attached", {
731      conversation_id_prefix: conversationId.slice(0, 8),
732      source: conversationIdSource,
733    });
734
735    // X-MCR-Session-FP fast-path hint: previously set via the same
736    // body-only mechanism that never reached the wire. Currently a no-op;
737    // diagnostic log retained so we still see when the gateway has
738    // assigned an fp. Revisit via registerProvider if we want it back.
739    if (state.sessionFp) {
740      nwlog("before_provider_request_fp_set", {
741        session_fp: state.sessionFp,
742        safe_drop_before: state.safeDropBefore,
743      });
744    }
745
746    // DEBUG (#3323 follow-up): capture the actual outbound payload shape
747    // so we can diagnose why the deployed pin-final-user-turn fix isn't
748    // firing for real Pi sessions. Logs shape only — no message content.
749    try {
750      const msgs = (payload.body as { messages?: Array<Record<string, unknown>> } | undefined)?.messages
751        ?? (payload.messages as Array<Record<string, unknown>> | undefined)
752        ?? [];
753      const roles = msgs.map((m) => String(m.role ?? m.type ?? "?"));
754      const lastN = msgs.slice(-5).map((m) => {
755        const role = String(m.role ?? m.type ?? "?");
756        const contentField = m.content;
757        let contentKind: string;
758        let contentLen: number;
759        if (typeof contentField === "string") {
760          contentKind = "string";
761          contentLen = contentField.length;
762        } else if (Array.isArray(contentField)) {
763          contentKind = "array[" + contentField.length + "]";
764          contentLen = contentField.reduce((acc, b) => acc + JSON.stringify(b).length, 0);
765        } else if (contentField === null || contentField === undefined) {
766          contentKind = "null";
767          contentLen = 0;
768        } else {
769          contentKind = typeof contentField;
770          contentLen = JSON.stringify(contentField).length;
771        }
772        const blockTypes = Array.isArray(contentField)
773          ? (contentField as Array<Record<string, unknown>>).map((b) => String(b.type ?? "?"))
774          : null;
775        return { role, contentKind, contentLen, blockTypes, hasToolCallId: "tool_call_id" in m };
776      });
777      const roleCounts: Record<string, number> = {};
778      for (const r of roles) roleCounts[r] = (roleCounts[r] || 0) + 1;
779      nwlog("outbound_payload_shape", {
780        session_fp: state.sessionFp,
781        num_messages: msgs.length,
782        role_distribution: roleCounts,
783        last_5_messages: lastN,
784        stream: Boolean((payload.body as { stream?: boolean } | undefined)?.stream ?? payload.stream),
785        payload_keys: Object.keys(payload),
786      });
787    } catch (err) {
788      nwlog("outbound_payload_shape_error", {
789        session_fp: state.sessionFp,
790        error: err instanceof Error ? err.message : String(err),
791      });
792    }
793  });
794
795  pi.on("session_before_compact", async (_event, ctx) => {
796    if (!isMCRModel(ctx.model?.id || "")) return;
797    if (state.sessionFp) {
798      nwlog("compaction_cancelled", { session_fp: state.sessionFp });
799      return { cancel: true };
800    }
801  });
802
803  pi.on("session_start", async (_event, ctx) => {
804    nwlog("session_start", { extension_version: EXTENSION_VERSION });
805    state.sessionFp = null;
806    state.safeDropBefore = 0;
807    state.storedThrough = 0;
808    state.totalEnergyJoules = 0;
809    state.sessionTurns = 0;
810    state.contextTokens = 0;
811    state.lastMcrMeta = null;
812    state.lastEnergy = null;
813    // Clear any in-flight indicator left over from a prior
814    // session (defensive — session_start would normally fire after any
815    // active request has completed, but a forced restart or fork can land
816    // here while inFlightSince is still set).
817    state.inFlightSince = null;
818    if (state.inFlightTickerHandle !== null) {
819      clearInterval(state.inFlightTickerHandle);
820      state.inFlightTickerHandle = null;
821    }
822    // Reset the UUID fallback so a new Pi session gets a fresh conversation
823    // id when getSessionId() isn't usable. The Pi session id itself rotates
824    // on its own.
825    uuidFallback = null;
826    ctx.ui.setStatus(MCR_STATUS_KEY, "");
827    ctx.ui.setStatus(ENERGY_STATUS_KEY, "");
828  });
829
830  pi.on("session_tree", async (_event, ctx) => {
831    // Branch navigation invalidates MCR session state — sessionFp and
832    // safeDropBefore are tied to a specific message sequence that no
833    // longer matches the new branch. Clear everything and let the
834    // next server response repopulate. Energy is replayed from the
835    // session log (same as the main provider's session_tree handler).
836    state.sessionFp = null;
837    state.safeDropBefore = 0;
838    state.storedThrough = 0;
839    state.totalEnergyJoules = 0;
840    state.sessionTurns = 0;
841    state.contextTokens = 0;
842    state.lastMcrMeta = null;
843    state.lastEnergy = null;
844
845    // Replay energy events from the session log for the new branch.
846    for (const entry of ctx.sessionManager.getBranch()) {
847      if (
848        entry.type === "custom" &&
849        entry.customType === "neuralwatt-energy" &&
850        typeof entry.data === "object" &&
851        entry.data
852      ) {
853        state.totalEnergyJoules += (entry.data as { energy_joules: number }).energy_joules || 0;
854      }
855    }
856
857    nwlog("session_tree", { total_energy_replayed: state.totalEnergyJoules });
858    updateStatusBar(ctx);
859  });
860
861  pi.on("session_shutdown", async (_event, ctx) => {
862    nwlog("session_shutdown", {
863      final_session_fp: state.sessionFp,
864      total_energy_joules: state.totalEnergyJoules,
865      session_turns: state.sessionTurns,
866    });
867    // Tear down the in-flight ticker so the interval handle
868    // doesn't outlive the session.
869    state.inFlightSince = null;
870    if (state.inFlightTickerHandle !== null) {
871      clearInterval(state.inFlightTickerHandle);
872      state.inFlightTickerHandle = null;
873    }
874    ctx.ui.setStatus(MCR_STATUS_KEY, "");
875    ctx.ui.setStatus(ENERGY_STATUS_KEY, "");
876  });
877}