live-session-store.mjs

  1import fs from 'node:fs';
  2import path from 'node:path';
  3import { getLegacyLiveSessionsDir, getLiveSessionsDir } from './impeccable-paths.mjs';
  4
  5const COMPLETED_PHASES = new Set(['completed', 'discarded']);
  6
  7export function createLiveSessionStore({ cwd = process.cwd(), sessionId } = {}) {
  8  const rootDir = getLiveSessionsDir(cwd);
  9  const legacyRootDir = getLegacyLiveSessionsDir(cwd);
 10  fs.mkdirSync(rootDir, { recursive: true });
 11  const snapshotCache = new Map();
 12
 13  function loadCachedOrRebuild(id) {
 14    const cached = snapshotCache.get(id);
 15    if (cached) return cached;
 16    const journalPath = getReadableJournalPath(id);
 17    const rebuilt = rebuildSnapshotFromJournal(journalPath, id);
 18    snapshotCache.set(id, rebuilt);
 19    return rebuilt;
 20  }
 21
 22  function getReadableJournalPath(id) {
 23    const primary = getJournalPath(rootDir, id);
 24    if (fs.existsSync(primary)) return primary;
 25    const legacy = getJournalPath(legacyRootDir, id);
 26    if (fs.existsSync(legacy)) return legacy;
 27    return primary;
 28  }
 29
 30  return {
 31    rootDir,
 32    legacyRootDir,
 33    appendEvent(event) {
 34      const normalized = normalizeEvent(event, sessionId);
 35      const journalPath = getJournalPath(rootDir, normalized.id);
 36      const snapshotPath = getSnapshotPath(rootDir, normalized.id);
 37      const legacyJournalPath = getJournalPath(legacyRootDir, normalized.id);
 38      if (!fs.existsSync(journalPath) && fs.existsSync(legacyJournalPath)) {
 39        fs.copyFileSync(legacyJournalPath, journalPath);
 40      }
 41      const prior = loadCachedOrRebuild(normalized.id);
 42      const seq = prior.nextSeq;
 43      const entry = {
 44        seq,
 45        id: normalized.id,
 46        type: normalized.type,
 47        ts: new Date().toISOString(),
 48        event: normalized,
 49      };
 50      fs.appendFileSync(journalPath, JSON.stringify(entry) + '\n');
 51      const next = applyEvent(prior.snapshot, entry, prior.diagnostics);
 52      snapshotCache.set(normalized.id, { snapshot: next, diagnostics: next.diagnostics || [], nextSeq: seq + 1 });
 53      writeSnapshot(snapshotPath, next);
 54      return next;
 55    },
 56    getSnapshot(id = sessionId, opts = {}) {
 57      if (!id) throw new Error('session id required');
 58      const journalPath = getReadableJournalPath(id);
 59      const snapshotPath = getSnapshotPath(rootDir, id);
 60      const rebuilt = rebuildSnapshotFromJournal(journalPath, id);
 61      snapshotCache.set(id, rebuilt);
 62      writeSnapshot(snapshotPath, rebuilt.snapshot);
 63      if (!opts.includeCompleted && COMPLETED_PHASES.has(rebuilt.snapshot.phase)) return null;
 64      return rebuilt.snapshot;
 65    },
 66    listActiveSessions() {
 67      const ids = new Set();
 68      for (const dir of [legacyRootDir, rootDir]) {
 69        if (!fs.existsSync(dir)) continue;
 70        for (const name of fs.readdirSync(dir)) {
 71          if (name.endsWith('.jsonl')) ids.add(name.slice(0, -'.jsonl'.length));
 72        }
 73      }
 74      return [...ids]
 75        .sort()
 76        .map((id) => this.getSnapshot(id))
 77        .filter(Boolean);
 78    },
 79  };
 80}
 81
 82function normalizeEvent(event, fallbackId) {
 83  if (!event || typeof event !== 'object') throw new Error('event object required');
 84  const id = event.id || fallbackId;
 85  if (!id || typeof id !== 'string') throw new Error('event id required');
 86  if (!event.type || typeof event.type !== 'string') throw new Error('event type required');
 87  return { ...event, id };
 88}
 89
 90function getJournalPath(rootDir, id) {
 91  return path.join(rootDir, safeSessionId(id) + '.jsonl');
 92}
 93
 94function getSnapshotPath(rootDir, id) {
 95  return path.join(rootDir, safeSessionId(id) + '.snapshot.json');
 96}
 97
 98function safeSessionId(id) {
 99  if (!/^[A-Za-z0-9_-]{1,128}$/.test(id)) throw new Error('invalid session id: ' + id);
100  return id;
101}
102
103function baseSnapshot(id) {
104  return {
105    id,
106    phase: 'new',
107    pageUrl: null,
108    sourceFile: null,
109    expectedVariants: 0,
110    arrivedVariants: 0,
111    visibleVariant: null,
112    paramValues: {},
113    pendingEventSeq: null,
114    pendingEvent: null,
115    deliveryLease: null,
116    checkpointRevision: 0,
117    activeOwner: null,
118    sourceMarkers: {},
119    fallbackMode: null,
120    annotationArtifacts: [],
121    diagnostics: [],
122    updatedAt: null,
123  };
124}
125
126function rebuildSnapshotFromJournal(journalPath, id) {
127  let snapshot = baseSnapshot(id);
128  const diagnostics = [];
129  let nextSeq = 1;
130  if (!fs.existsSync(journalPath)) return { snapshot, diagnostics, nextSeq };
131
132  const lines = fs.readFileSync(journalPath, 'utf-8').split('\n');
133  for (let i = 0; i < lines.length; i++) {
134    const line = lines[i];
135    if (!line.trim()) continue;
136    try {
137      const entry = JSON.parse(line);
138      if (!entry || typeof entry !== 'object') throw new Error('entry is not object');
139      if (Number.isInteger(entry.seq)) nextSeq = Math.max(nextSeq, entry.seq + 1);
140      snapshot = applyEvent(snapshot, entry);
141    } catch (err) {
142      diagnostics.push({
143        error: 'journal_parse_failed',
144        line: i + 1,
145        message: err.message,
146      });
147    }
148  }
149  snapshot.diagnostics = [...snapshot.diagnostics, ...diagnostics];
150  return { snapshot, diagnostics, nextSeq };
151}
152
153function applyEvent(snapshot, entry, inheritedDiagnostics = []) {
154  const event = entry.event || entry;
155  const next = {
156    ...snapshot,
157    paramValues: { ...(snapshot.paramValues || {}) },
158    sourceMarkers: { ...(snapshot.sourceMarkers || {}) },
159    annotationArtifacts: [...(snapshot.annotationArtifacts || [])],
160    diagnostics: [...(snapshot.diagnostics || [])],
161    updatedAt: entry.ts || new Date().toISOString(),
162  };
163
164  if (inheritedDiagnostics.length && next.diagnostics.length === 0) {
165    next.diagnostics = [...inheritedDiagnostics];
166  }
167
168  switch (event.type) {
169    case 'generate':
170      next.phase = 'generate_requested';
171      next.pageUrl = event.pageUrl ?? next.pageUrl;
172      next.expectedVariants = event.count ?? next.expectedVariants;
173      next.pendingEventSeq = entry.seq ?? next.pendingEventSeq;
174      next.pendingEvent = toPendingEvent(event);
175      if (event.screenshotPath) upsertArtifact(next.annotationArtifacts, { type: 'screenshot', path: event.screenshotPath });
176      break;
177    case 'variants_ready':
178    case 'agent_done':
179      next.phase = event.carbonize === true ? 'carbonize_required' : 'variants_ready';
180      next.sourceFile = event.file ?? next.sourceFile;
181      next.arrivedVariants = event.arrivedVariants ?? (next.arrivedVariants ?? next.expectedVariants);
182      next.pendingEventSeq = null;
183      next.pendingEvent = null;
184      if (event.carbonize === true) {
185        next.diagnostics.push({
186          error: 'carbonize_cleanup_required',
187          file: event.file || null,
188          message: 'Accepted variant still has carbonize markers that must be folded into source CSS.',
189        });
190      }
191      break;
192    case 'checkpoint':
193      if ((event.revision ?? 0) >= (next.checkpointRevision ?? 0)) {
194        next.phase = event.phase ?? next.phase;
195        next.checkpointRevision = event.revision ?? next.checkpointRevision;
196        next.activeOwner = event.owner ?? next.activeOwner;
197        next.arrivedVariants = event.arrivedVariants ?? next.arrivedVariants;
198        next.visibleVariant = event.visibleVariant ?? next.visibleVariant;
199        if (event.paramValues) next.paramValues = { ...event.paramValues };
200      } else {
201        next.diagnostics.push({ error: 'stale_checkpoint_ignored', revision: event.revision });
202      }
203      break;
204    case 'accept':
205    case 'accept_intent':
206      next.phase = 'accept_requested';
207      next.visibleVariant = Number(event.variantId ?? next.visibleVariant);
208      if (event.paramValues) next.paramValues = { ...event.paramValues };
209      next.pendingEventSeq = entry.seq ?? next.pendingEventSeq;
210      next.pendingEvent = toPendingEvent(event);
211      break;
212    case 'discard':
213      next.phase = 'discard_requested';
214      next.pendingEventSeq = entry.seq ?? next.pendingEventSeq;
215      next.pendingEvent = toPendingEvent(event);
216      break;
217    case 'discarded':
218      next.phase = 'discarded';
219      next.pendingEventSeq = null;
220      next.pendingEvent = null;
221      break;
222    case 'complete':
223      next.phase = 'completed';
224      next.pendingEventSeq = null;
225      next.pendingEvent = null;
226      break;
227    case 'agent_error':
228      next.phase = 'agent_error';
229      next.pendingEventSeq = null;
230      next.pendingEvent = null;
231      next.diagnostics.push({ error: 'agent_error', message: event.message || 'unknown agent error' });
232      break;
233    default:
234      next.diagnostics.push({ error: 'unknown_event_type', type: event.type });
235      break;
236  }
237  return next;
238}
239
240function toPendingEvent(event) {
241  const pending = { ...event };
242  delete pending.token;
243  return pending;
244}
245
246function upsertArtifact(artifacts, artifact) {
247  if (!artifacts.some((existing) => existing.path === artifact.path && existing.type === artifact.type)) {
248    artifacts.push(artifact);
249  }
250}
251
252function writeSnapshot(snapshotPath, snapshot) {
253  fs.writeFileSync(snapshotPath, JSON.stringify(snapshot, null, 2) + '\n');
254}