1import fs from 'node:fs';
2import path from 'node:path';
3import { getLegacyLiveSessionsDir, getLiveSessionsDir } from './impeccable-paths.mjs';
4
5const COMPLETED_PHASES = new Set(['completed', 'discarded']);
6
7export function createLiveSessionStore({ cwd = process.cwd(), sessionId } = {}) {
8 const rootDir = getLiveSessionsDir(cwd);
9 const legacyRootDir = getLegacyLiveSessionsDir(cwd);
10 fs.mkdirSync(rootDir, { recursive: true });
11 const snapshotCache = new Map();
12
13 function loadCachedOrRebuild(id) {
14 const cached = snapshotCache.get(id);
15 if (cached) return cached;
16 const journalPath = getReadableJournalPath(id);
17 const rebuilt = rebuildSnapshotFromJournal(journalPath, id);
18 snapshotCache.set(id, rebuilt);
19 return rebuilt;
20 }
21
22 function getReadableJournalPath(id) {
23 const primary = getJournalPath(rootDir, id);
24 if (fs.existsSync(primary)) return primary;
25 const legacy = getJournalPath(legacyRootDir, id);
26 if (fs.existsSync(legacy)) return legacy;
27 return primary;
28 }
29
30 return {
31 rootDir,
32 legacyRootDir,
33 appendEvent(event) {
34 const normalized = normalizeEvent(event, sessionId);
35 const journalPath = getJournalPath(rootDir, normalized.id);
36 const snapshotPath = getSnapshotPath(rootDir, normalized.id);
37 const legacyJournalPath = getJournalPath(legacyRootDir, normalized.id);
38 if (!fs.existsSync(journalPath) && fs.existsSync(legacyJournalPath)) {
39 fs.copyFileSync(legacyJournalPath, journalPath);
40 }
41 const prior = loadCachedOrRebuild(normalized.id);
42 const seq = prior.nextSeq;
43 const entry = {
44 seq,
45 id: normalized.id,
46 type: normalized.type,
47 ts: new Date().toISOString(),
48 event: normalized,
49 };
50 fs.appendFileSync(journalPath, JSON.stringify(entry) + '\n');
51 const next = applyEvent(prior.snapshot, entry, prior.diagnostics);
52 snapshotCache.set(normalized.id, { snapshot: next, diagnostics: next.diagnostics || [], nextSeq: seq + 1 });
53 writeSnapshot(snapshotPath, next);
54 return next;
55 },
56 getSnapshot(id = sessionId, opts = {}) {
57 if (!id) throw new Error('session id required');
58 const journalPath = getReadableJournalPath(id);
59 const snapshotPath = getSnapshotPath(rootDir, id);
60 const rebuilt = rebuildSnapshotFromJournal(journalPath, id);
61 snapshotCache.set(id, rebuilt);
62 writeSnapshot(snapshotPath, rebuilt.snapshot);
63 if (!opts.includeCompleted && COMPLETED_PHASES.has(rebuilt.snapshot.phase)) return null;
64 return rebuilt.snapshot;
65 },
66 listActiveSessions() {
67 const ids = new Set();
68 for (const dir of [legacyRootDir, rootDir]) {
69 if (!fs.existsSync(dir)) continue;
70 for (const name of fs.readdirSync(dir)) {
71 if (name.endsWith('.jsonl')) ids.add(name.slice(0, -'.jsonl'.length));
72 }
73 }
74 return [...ids]
75 .sort()
76 .map((id) => this.getSnapshot(id))
77 .filter(Boolean);
78 },
79 };
80}
81
82function normalizeEvent(event, fallbackId) {
83 if (!event || typeof event !== 'object') throw new Error('event object required');
84 const id = event.id || fallbackId;
85 if (!id || typeof id !== 'string') throw new Error('event id required');
86 if (!event.type || typeof event.type !== 'string') throw new Error('event type required');
87 return { ...event, id };
88}
89
90function getJournalPath(rootDir, id) {
91 return path.join(rootDir, safeSessionId(id) + '.jsonl');
92}
93
94function getSnapshotPath(rootDir, id) {
95 return path.join(rootDir, safeSessionId(id) + '.snapshot.json');
96}
97
98function safeSessionId(id) {
99 if (!/^[A-Za-z0-9_-]{1,128}$/.test(id)) throw new Error('invalid session id: ' + id);
100 return id;
101}
102
103function baseSnapshot(id) {
104 return {
105 id,
106 phase: 'new',
107 pageUrl: null,
108 sourceFile: null,
109 expectedVariants: 0,
110 arrivedVariants: 0,
111 visibleVariant: null,
112 paramValues: {},
113 pendingEventSeq: null,
114 pendingEvent: null,
115 deliveryLease: null,
116 checkpointRevision: 0,
117 activeOwner: null,
118 sourceMarkers: {},
119 fallbackMode: null,
120 annotationArtifacts: [],
121 diagnostics: [],
122 updatedAt: null,
123 };
124}
125
126function rebuildSnapshotFromJournal(journalPath, id) {
127 let snapshot = baseSnapshot(id);
128 const diagnostics = [];
129 let nextSeq = 1;
130 if (!fs.existsSync(journalPath)) return { snapshot, diagnostics, nextSeq };
131
132 const lines = fs.readFileSync(journalPath, 'utf-8').split('\n');
133 for (let i = 0; i < lines.length; i++) {
134 const line = lines[i];
135 if (!line.trim()) continue;
136 try {
137 const entry = JSON.parse(line);
138 if (!entry || typeof entry !== 'object') throw new Error('entry is not object');
139 if (Number.isInteger(entry.seq)) nextSeq = Math.max(nextSeq, entry.seq + 1);
140 snapshot = applyEvent(snapshot, entry);
141 } catch (err) {
142 diagnostics.push({
143 error: 'journal_parse_failed',
144 line: i + 1,
145 message: err.message,
146 });
147 }
148 }
149 snapshot.diagnostics = [...snapshot.diagnostics, ...diagnostics];
150 return { snapshot, diagnostics, nextSeq };
151}
152
153function applyEvent(snapshot, entry, inheritedDiagnostics = []) {
154 const event = entry.event || entry;
155 const next = {
156 ...snapshot,
157 paramValues: { ...(snapshot.paramValues || {}) },
158 sourceMarkers: { ...(snapshot.sourceMarkers || {}) },
159 annotationArtifacts: [...(snapshot.annotationArtifacts || [])],
160 diagnostics: [...(snapshot.diagnostics || [])],
161 updatedAt: entry.ts || new Date().toISOString(),
162 };
163
164 if (inheritedDiagnostics.length && next.diagnostics.length === 0) {
165 next.diagnostics = [...inheritedDiagnostics];
166 }
167
168 switch (event.type) {
169 case 'generate':
170 next.phase = 'generate_requested';
171 next.pageUrl = event.pageUrl ?? next.pageUrl;
172 next.expectedVariants = event.count ?? next.expectedVariants;
173 next.pendingEventSeq = entry.seq ?? next.pendingEventSeq;
174 next.pendingEvent = toPendingEvent(event);
175 if (event.screenshotPath) upsertArtifact(next.annotationArtifacts, { type: 'screenshot', path: event.screenshotPath });
176 break;
177 case 'variants_ready':
178 case 'agent_done':
179 next.phase = event.carbonize === true ? 'carbonize_required' : 'variants_ready';
180 next.sourceFile = event.file ?? next.sourceFile;
181 next.arrivedVariants = event.arrivedVariants ?? (next.arrivedVariants ?? next.expectedVariants);
182 next.pendingEventSeq = null;
183 next.pendingEvent = null;
184 if (event.carbonize === true) {
185 next.diagnostics.push({
186 error: 'carbonize_cleanup_required',
187 file: event.file || null,
188 message: 'Accepted variant still has carbonize markers that must be folded into source CSS.',
189 });
190 }
191 break;
192 case 'checkpoint':
193 if ((event.revision ?? 0) >= (next.checkpointRevision ?? 0)) {
194 next.phase = event.phase ?? next.phase;
195 next.checkpointRevision = event.revision ?? next.checkpointRevision;
196 next.activeOwner = event.owner ?? next.activeOwner;
197 next.arrivedVariants = event.arrivedVariants ?? next.arrivedVariants;
198 next.visibleVariant = event.visibleVariant ?? next.visibleVariant;
199 if (event.paramValues) next.paramValues = { ...event.paramValues };
200 } else {
201 next.diagnostics.push({ error: 'stale_checkpoint_ignored', revision: event.revision });
202 }
203 break;
204 case 'accept':
205 case 'accept_intent':
206 next.phase = 'accept_requested';
207 next.visibleVariant = Number(event.variantId ?? next.visibleVariant);
208 if (event.paramValues) next.paramValues = { ...event.paramValues };
209 next.pendingEventSeq = entry.seq ?? next.pendingEventSeq;
210 next.pendingEvent = toPendingEvent(event);
211 break;
212 case 'discard':
213 next.phase = 'discard_requested';
214 next.pendingEventSeq = entry.seq ?? next.pendingEventSeq;
215 next.pendingEvent = toPendingEvent(event);
216 break;
217 case 'discarded':
218 next.phase = 'discarded';
219 next.pendingEventSeq = null;
220 next.pendingEvent = null;
221 break;
222 case 'complete':
223 next.phase = 'completed';
224 next.pendingEventSeq = null;
225 next.pendingEvent = null;
226 break;
227 case 'agent_error':
228 next.phase = 'agent_error';
229 next.pendingEventSeq = null;
230 next.pendingEvent = null;
231 next.diagnostics.push({ error: 'agent_error', message: event.message || 'unknown agent error' });
232 break;
233 default:
234 next.diagnostics.push({ error: 'unknown_event_type', type: event.type });
235 break;
236 }
237 return next;
238}
239
240function toPendingEvent(event) {
241 const pending = { ...event };
242 delete pending.token;
243 return pending;
244}
245
246function upsertArtifact(artifacts, artifact) {
247 if (!artifacts.some((existing) => existing.path === artifact.path && existing.type === artifact.type)) {
248 artifacts.push(artifact);
249 }
250}
251
252function writeSnapshot(snapshotPath, snapshot) {
253 fs.writeFileSync(snapshotPath, JSON.stringify(snapshot, null, 2) + '\n');
254}