export type SseStreamEvent = { event: string; data: unknown; }; type SseStreamListener = (event: SseStreamEvent) => void; export class ActiveSseStream { private readonly events: SseStreamEvent[] = []; private readonly listeners = new Set(); private completed = false; private resolveDone!: () => void; readonly done: Promise; constructor() { this.done = new Promise((resolve) => { this.resolveDone = resolve; }); } get isCompleted() { return this.completed; } emit(event: string, data: unknown) { if (this.completed) return; const entry = { event, data }; this.events.push(entry); for (const listener of this.listeners) { listener(entry); } } complete(finalEvent?: SseStreamEvent) { if (this.completed) return; if (finalEvent) { this.emit(finalEvent.event, finalEvent.data); } this.completed = true; this.listeners.clear(); this.resolveDone(); } subscribe(listener: SseStreamListener) { for (const event of this.events) { listener(event); } if (this.completed) { return () => {}; } this.listeners.add(listener); return () => { this.listeners.delete(listener); }; } }