import { spawn } from 'node:child_process'; import { randomUUID } from 'node:crypto'; import fs from 'node:fs/promises'; import { createServer } from 'node:http'; import path from 'node:path'; import { Readable } from 'node:stream'; import { fileURLToPath } from 'node:url'; import express from 'express'; import { WebSocket, WebSocketServer } from 'ws'; const __dirname = path.dirname(fileURLToPath(import.meta.url)); const publicDir = path.join(__dirname, '..', 'public'); const app = express(); const server = createServer(app); const wss = new WebSocketServer({ noServer: true }); const PORT = Number(process.env.PORT ?? 3000); const FFMPEG_PATH = process.env.FFMPEG_PATH ?? 'ffmpeg'; const FFPROBE_PATH = process.env.FFPROBE_PATH ?? 'ffprobe'; const FFMPEG_LOG_LEVEL = process.env.FFMPEG_LOG_LEVEL ?? 'warning'; const FFMPEG_INPUT_SEEKABLE = process.env.FFMPEG_INPUT_SEEKABLE ?? '0'; const PLAYBACK_CONNECTION_MODE = parsePlaybackConnectionMode(process.env.PLAYBACK_CONNECTION_MODE ?? process.env.PLAYBACK_MODE); const RECENT_URLS_PATH = process.env.RECENT_URLS_PATH ?? path.join(__dirname, '..', 'data', 'recent-urls.json'); const RECENT_URL_LIMIT = clampInteger(process.env.RECENT_URL_LIMIT, 12, 1, 50); const SESSION_TTL_MS = 60 * 60 * 1000; const METADATA_PROBE_TIMEOUT_MS = 8 * 1000; const PLAYBACK_READY_TIMEOUT_MS = 15 * 1000; const MAX_WS_BUFFER_BYTES = clampInteger(process.env.MAX_WS_BUFFER_BYTES, 2 * 1024 * 1024, 128 * 1024, 64 * 1024 * 1024); const MAX_AUDIO_QUEUE_BYTES = clampInteger(process.env.MAX_AUDIO_QUEUE_BYTES, 16 * 1024 * 1024, 256 * 1024, 128 * 1024 * 1024); const MAX_RELAY_BRANCH_QUEUE_BYTES = clampInteger(process.env.MAX_RELAY_BRANCH_QUEUE_BYTES, 16 * 1024 * 1024, 512 * 1024, 256 * 1024 * 1024); const RELAY_BRANCH_PAUSE_BYTES = Math.floor(MAX_RELAY_BRANCH_QUEUE_BYTES / 2); const JPEG_SOI = Buffer.from([0xff, 0xd8]); const JPEG_EOI = Buffer.from([0xff, 0xd9]); const defaults = { fps: 24, width: 960, quality: 5, audioBitrate: '160k', }; const sessions = new Map(); const sourceTokens = new Map(); const playbacks = new Map(); let recentUrls = []; let recentWrite = Promise.resolve(); app.disable('x-powered-by'); app.use(express.json({ limit: '32kb' })); app.use(express.static(publicDir)); app.get('/api/health', (_request, response) => { response.json({ ok: true, ffmpeg: FFMPEG_PATH, playbackConnectionMode: PLAYBACK_CONNECTION_MODE }); }); app.get('/api/recent-urls', (_request, response) => { response.json({ urls: recentUrls.map((item) => ({ url: item.url, displayUrl: redactSecrets(item.url), lastPlayedAt: item.lastPlayedAt, })), }); }); app.post('/api/session', async (request, response) => { let url; try { url = parseStreamUrl(request.body?.url); } catch (error) { response.status(400).json({ error: error.message }); return; } const options = parsePlaybackOptions(request.body); const id = randomUUID(); sessions.set(id, { id, url, options, duration: null, metadataStatus: 'pending', seekSeconds: 0, seekGeneration: 0, createdAt: Date.now(), lastUsedAt: Date.now(), }); startSessionMetadataProbe(sessions.get(id)); try { await addRecentUrl(url); } catch (error) { console.warn(`Failed to store recent URL: ${error.message}`); } response.status(201).json(formatSessionPayload(sessions.get(id))); }); app.get('/api/session/:sessionId', (request, response) => { const session = getSession(request.params.sessionId); if (!session) { response.status(404).json({ error: 'Unknown or expired session.' }); return; } response.json(formatSessionPayload(session)); }); app.post('/api/session/:sessionId/seek', (request, response) => { const session = getSession(request.params.sessionId); if (!session) { response.status(404).json({ error: 'Unknown or expired session.' }); return; } if (!isSessionSeekable(session)) { response.status(409).json({ error: 'This stream is not seekable.' }); return; } const requestedSeconds = Number(request.body?.time); if (!Number.isFinite(requestedSeconds)) { response.status(400).json({ error: 'A numeric seek time is required.' }); return; } const seekSeconds = clampNumber(requestedSeconds, 0, session.duration); const playback = playbacks.get(session.id); session.seekSeconds = seekSeconds; session.seekGeneration += 1; session.lastUsedAt = Date.now(); if (playback && !playback.closed) { playback.stop('client_seek'); if (playbacks.get(session.id) === playback) { playbacks.delete(session.id); } } logInfo(`session seeked id=${shortId(session.id)} time=${seekSeconds.toFixed(3)} duration=${session.duration.toFixed(3)}`); response.json(formatSessionPayload(session)); }); app.all('/_source/:token', async (request, response) => { const source = sourceTokens.get(request.params.token); const session = source ? getSession(source.sessionId) : null; const startedAt = Date.now(); const sourceLabel = source ? `${source.kind}:${shortId(source.sessionId)}` : `unknown:${request.params.token.slice(0, 8)}`; let bytes = 0; let upstreamEnded = false; let upstreamStatus = 'pending'; if (!session) { response.status(404).end(); return; } const controller = new AbortController(); const logClose = once(() => { logInfo(`source closed kind=${sourceLabel} status=${upstreamStatus} bytes=${bytes} upstreamEnded=${upstreamEnded} durationMs=${Date.now() - startedAt}`); }); const cleanup = once(() => { controller.abort(); logClose(); }); response.on('close', cleanup); try { const headers = new Headers(); const range = request.get('range'); if (range) { headers.set('range', range); } headers.set('accept-encoding', 'identity'); const upstream = await fetch(session.url, { method: request.method === 'HEAD' ? 'HEAD' : 'GET', headers, redirect: 'follow', signal: controller.signal, }); upstreamStatus = String(upstream.status); logInfo(`source connected kind=${sourceLabel} status=${upstream.status} contentType=${upstream.headers.get('content-type') ?? 'unknown'}`); response.status(upstream.status); copyUpstreamHeaders(upstream.headers, response); if (request.method === 'HEAD' || !upstream.body) { response.end(); return; } Readable.fromWeb(upstream.body).on('data', (chunk) => { bytes += chunk.length; }).on('end', () => { upstreamEnded = true; }).on('error', (error) => { if (!controller.signal.aborted) { logWarn(`source stream error kind=${sourceLabel} error=${oneLine(error.message)}`); } if (!response.destroyed) { response.destroy(error); } }).pipe(response); } catch (error) { if (controller.signal.aborted) { return; } logWarn(`source failed kind=${sourceLabel} error=${oneLine(error.message)}`); if (!response.headersSent) { response.status(502).json({ error: 'Failed to fetch source stream.' }); } else { response.destroy(error); } } }); app.get('/audio/:sessionId', (request, response) => { const session = getSession(request.params.sessionId); if (!session) { response.status(404).json({ error: 'Unknown or expired session.' }); return; } if (PLAYBACK_CONNECTION_MODE === 'single' || PLAYBACK_CONNECTION_MODE === 'relay') { getOrCreatePlayback(session).attachAudio(request, response); return; } streamSplitAudio(request, response, session); }); server.on('upgrade', (request, socket, head) => { const host = request.headers.host ?? 'localhost'; const { pathname } = new URL(request.url ?? '/', `http://${host}`); const match = pathname.match(/^\/frames\/([0-9a-f-]+)$/i); if (!match || !getSession(match[1])) { socket.write('HTTP/1.1 404 Not Found\r\n\r\n'); socket.destroy(); return; } wss.handleUpgrade(request, socket, head, (websocket) => { wss.emit('connection', websocket, request, match[1]); }); }); wss.on('connection', (websocket, _request, sessionId) => { const session = getSession(sessionId); if (!session) { websocket.close(1008, 'Unknown session'); return; } if (PLAYBACK_CONNECTION_MODE === 'single' || PLAYBACK_CONNECTION_MODE === 'relay') { getOrCreatePlayback(session).attachFrames(websocket); return; } streamSplitFrames(websocket, session); }); setInterval(() => { const now = Date.now(); for (const [id, session] of sessions) { if (now - session.lastUsedAt > SESSION_TTL_MS) { sessions.delete(id); } } for (const [token, source] of sourceTokens) { if (now - source.createdAt > SESSION_TTL_MS) { sourceTokens.delete(token); } } }, 60 * 1000).unref(); await loadRecentUrls(); server.listen(PORT, () => { console.log(`Frame stream app listening at http://localhost:${PORT} mode=${PLAYBACK_CONNECTION_MODE}`); }); function parseStreamUrl(value) { if (typeof value !== 'string' || value.trim().length === 0) { throw new Error('A stream URL is required.'); } if (value.length > 4096) { throw new Error('The stream URL is too long.'); } const parsed = new URL(value.trim()); if (!['http:', 'https:'].includes(parsed.protocol)) { throw new Error('Only http and https stream URLs are supported.'); } return parsed.toString(); } function parsePlaybackOptions(body) { return { fps: clampInteger(body?.fps, defaults.fps, 1, 30), width: clampInteger(body?.width, defaults.width, 160, 1920), quality: clampInteger(body?.quality, defaults.quality, 2, 18), audioBitrate: parseAudioBitrate(body?.audioBitrate), }; } function parsePlaybackConnectionMode(value) { if (value === 'relay' || value === 'tee') { return 'relay'; } if (value === 'single' || value === 'unified') { return 'single'; } if (value === 'split' || value === 'separate' || value === undefined || value === '') { return 'split'; } console.warn(`Unknown PLAYBACK_CONNECTION_MODE "${value}", using split.`); return 'split'; } function clampInteger(value, fallback, min, max) { const parsed = Number(value); if (!Number.isFinite(parsed)) { return fallback; } return Math.min(max, Math.max(min, Math.round(parsed))); } function clampNumber(value, min, max) { return Math.min(max, Math.max(min, value)); } function parseAudioBitrate(value) { if (typeof value !== 'string') { return defaults.audioBitrate; } return /^\d{2,3}k$/i.test(value) ? value.toLowerCase() : defaults.audioBitrate; } function formatSessionPayload(session) { return { id: session.id, options: session.options, duration: Number.isFinite(session.duration) ? session.duration : null, metadataStatus: session.metadataStatus, seekable: isSessionSeekable(session), seekSeconds: session.seekSeconds, seekGeneration: session.seekGeneration, }; } function isSessionSeekable(session) { return PLAYBACK_CONNECTION_MODE !== 'relay' && Number.isFinite(session.duration) && session.duration > 0; } function startSessionMetadataProbe(session) { void probeSessionDuration(session).then((duration) => { const current = sessions.get(session.id); if (current !== session) { return; } session.duration = duration; session.metadataStatus = Number.isFinite(duration) ? 'ready' : 'unavailable'; }).catch((error) => { const current = sessions.get(session.id); if (current !== session) { return; } session.duration = null; session.metadataStatus = 'unavailable'; logWarn(`duration probe failed id=${shortId(session.id)} error=${oneLine(error.message)}`); }); } async function probeSessionDuration(session) { const source = createSourceInput(session.id, 'probe'); try { const output = await runFfprobe([ '-v', 'error', '-show_entries', 'format=duration', '-of', 'json', source.url, ]); const payload = JSON.parse(output); const duration = Number(payload?.format?.duration); return Number.isFinite(duration) && duration > 0 ? duration : null; } finally { source.release(); } } function runFfprobe(args) { return new Promise((resolve, reject) => { const ffprobe = spawn(FFPROBE_PATH, args, { stdio: ['ignore', 'pipe', 'pipe'], }); let stdout = ''; let stderr = ''; let timedOut = false; let settled = false; const finish = (callback) => { if (settled) { return; } settled = true; clearTimeout(timer); callback(); }; const timer = setTimeout(() => { timedOut = true; stopProcess(ffprobe); }, METADATA_PROBE_TIMEOUT_MS); timer.unref(); ffprobe.stdout.on('data', (chunk) => { stdout = appendTail(stdout, chunk); }); ffprobe.stderr.on('data', (chunk) => { stderr = appendTail(stderr, chunk); }); ffprobe.on('error', (error) => { finish(() => reject(error)); }); ffprobe.on('close', (code, signal) => { finish(() => { if (timedOut) { reject(new Error('ffprobe timed out.')); return; } if (code === 0) { resolve(stdout); return; } const detail = redactSecrets(stderr).trim(); reject(new Error(detail ? `ffprobe exited with code ${code}: ${detail}` : `ffprobe exited with code ${code ?? 'null'} signal ${signal ?? 'none'}.`)); }); }); }); } async function loadRecentUrls() { try { const raw = await fs.readFile(RECENT_URLS_PATH, 'utf8'); const parsed = JSON.parse(raw); const items = Array.isArray(parsed?.urls) ? parsed.urls : []; recentUrls = items .filter((item) => typeof item?.url === 'string' && typeof item?.lastPlayedAt === 'string') .slice(0, RECENT_URL_LIMIT); } catch (error) { if (error.code !== 'ENOENT') { console.warn(`Failed to read recent URLs: ${error.message}`); } recentUrls = []; } } async function addRecentUrl(url) { const lastPlayedAt = new Date().toISOString(); recentUrls = [ { url, lastPlayedAt }, ...recentUrls.filter((item) => item.url !== url), ].slice(0, RECENT_URL_LIMIT); recentWrite = recentWrite.catch(() => {}).then(() => saveRecentUrls()); await recentWrite; } async function saveRecentUrls() { const directory = path.dirname(RECENT_URLS_PATH); const temporaryPath = `${RECENT_URLS_PATH}.${process.pid}.tmp`; const payload = `${JSON.stringify({ urls: recentUrls }, null, 2)}\n`; await fs.mkdir(directory, { recursive: true }); await fs.writeFile(temporaryPath, payload, 'utf8'); await fs.rename(temporaryPath, RECENT_URLS_PATH); } function getSession(sessionId) { const session = sessions.get(sessionId); if (!session) { return null; } session.lastUsedAt = Date.now(); return session; } function getOrCreatePlayback(session) { const existing = playbacks.get(session.id); if (existing && !existing.closed) { return existing; } const playback = PLAYBACK_CONNECTION_MODE === 'relay' ? createRelayPlayback(session) : createPlayback(session); playbacks.set(session.id, playback); return playback; } function streamSplitAudio(request, response, session) { const worker = createAudioWorker(session); const releaseWorker = once(worker.release); const ffmpeg = spawnFfmpeg(worker.args); const logger = createFfmpegLogger('audio', session.id, ffmpeg.pid); const stderrTail = createFfmpegStderrTail(); let responseFinished = false; let stopReason = 'process_exit'; logger.start(); response.set({ 'Cache-Control': 'no-store', 'Content-Type': 'audio/mpeg', 'X-Content-Type-Options': 'nosniff', }); response.flushHeaders(); ffmpeg.stderr.on('data', (chunk) => { stderrTail.write(chunk); logger.stderr(chunk); }); ffmpeg.stdout.on('error', (error) => { logWarn(`audio output error kind=audio:${shortId(session.id)} error=${oneLine(error.message)}`); stopReason = 'audio_output_error'; stopProcess(ffmpeg); }); ffmpeg.stdout.pipe(response); ffmpeg.on('error', (error) => { stopReason = 'start_error'; releaseWorker(); logger.error(error); if (!response.writableEnded) { response.end(); } }); ffmpeg.on('close', (code, signal) => { stderrTail.flush(); releaseWorker(); logger.close(code, signal, stopReason, stderrTail.value); if (!response.writableEnded) { response.end(); } }); response.on('finish', () => { responseFinished = true; }); const cleanup = once(() => { if (responseFinished || response.writableEnded) { return; } stopReason = 'client_disconnect'; stopProcess(ffmpeg); }); request.on('close', cleanup); response.on('close', cleanup); } function streamSplitFrames(websocket, session) { const worker = createFrameWorker(session); const releaseWorker = once(worker.release); const ffmpeg = spawnFfmpeg(worker.args); const logger = createFfmpegLogger('frames', session.id, ffmpeg.pid); const stderrTail = createFfmpegStderrTail(); const { fps, quality, width } = session.options; const label = `frames:${shortId(session.id)}`; let frameBuffer = Buffer.alloc(0); let frameIndex = 0; let skippedFrames = 0; let stopReason = 'process_exit'; let serverClosingWebSocket = false; logger.start(); sendJson(websocket, { type: 'ready', codec: 'jpeg', fps, quality, width, duration: session.duration, seekable: isSessionSeekable(session), seekSeconds: session.seekSeconds, }); ffmpeg.stdout.on('data', handleFrameData); ffmpeg.stdout.on('error', (error) => { logWarn(`frame output error kind=${label} error=${oneLine(error.message)}`); stopReason = 'frame_output_error'; stopProcess(ffmpeg); }); ffmpeg.stderr.on('data', (chunk) => { stderrTail.write(chunk); logger.stderr(chunk); }); ffmpeg.on('error', (error) => { stopReason = 'start_error'; releaseWorker(); logger.error(error); sendJson(websocket, { type: 'error', message: `Failed to start ffmpeg: ${error.message}`, }); websocket.close(1011, 'ffmpeg start failed'); }); ffmpeg.on('close', (code, signal) => { stderrTail.flush(); releaseWorker(); logger.close(code, signal, stopReason, stderrTail.value); if (isWebSocketOpen(websocket) && stopReason !== 'client_disconnect') { sendJson(websocket, { type: 'end', code, signal, skippedFrames, message: summarizeFfmpegExit(code, signal, stderrTail.value), }); serverClosingWebSocket = true; websocket.close(1000, 'ffmpeg exited'); } logInfo(`frames closed kind=${label} reason=${stopReason} frames=${frameIndex} skippedFrames=${skippedFrames}`); }); websocket.on('close', () => { if (serverClosingWebSocket) { return; } stopReason = 'client_disconnect'; stopProcess(ffmpeg); }); websocket.on('error', () => { stopReason = 'websocket_error'; stopProcess(ffmpeg); }); function handleFrameData(chunk) { frameBuffer = Buffer.concat([frameBuffer, chunk]); for (;;) { const start = frameBuffer.indexOf(JPEG_SOI); if (start === -1) { frameBuffer = Buffer.alloc(0); return; } const end = frameBuffer.indexOf(JPEG_EOI, start + JPEG_SOI.length); if (end === -1) { frameBuffer = start === 0 ? frameBuffer : frameBuffer.subarray(start); return; } const jpeg = frameBuffer.subarray(start, end + JPEG_EOI.length); frameBuffer = frameBuffer.subarray(end + JPEG_EOI.length); const timestamp = frameIndex / fps; frameIndex += 1; if (!isWebSocketOpen(websocket)) { return; } const sendResult = sendFramePacket(websocket, timestamp, jpeg, () => { stopReason = 'websocket_send_error'; stopProcess(ffmpeg); }); if (sendResult === 'closed') { return; } if (sendResult === 'skipped') { skippedFrames += 1; } } } } function createRelayPlayback(session) { const { fps, quality, width } = session.options; const label = `relay:${shortId(session.id)}`; let audioResponse = null; let websocket = null; let audioFfmpeg = null; let frameFfmpeg = null; let audioInput = null; let frameInput = null; let audioLogger = null; let frameLogger = null; let sourceController = null; let sourceBytes = 0; let frameBuffer = Buffer.alloc(0); let frameIndex = 0; let skippedFrames = 0; let stopReason = 'process_exit'; let started = false; let stopping = false; let closed = false; let sourceEnded = false; let audioClosed = false; let frameClosed = false; let frameExitCode = null; let frameExitSignal = null; let frameEndSent = false; let audioResponseFinished = false; let serverClosingWebSocket = false; let readyTimer = null; const audioStderr = createFfmpegStderrTail(); const frameStderr = createFfmpegStderrTail(); const playback = { get closed() { return closed; }, stop(reason = 'playback_stopped') { stop(reason); }, attachAudio(request, response) { if (closed) { response.status(410).end(); return; } if (audioResponse && !audioResponse.writableEnded) { response.status(409).json({ error: 'Audio stream already attached for this session.' }); return; } audioResponse = response; response.set({ 'Cache-Control': 'no-store', 'Content-Type': 'audio/mpeg', 'X-Content-Type-Options': 'nosniff', }); response.flushHeaders(); response.on('finish', () => { audioResponseFinished = true; }); const cleanup = once(() => { if (!closed && !audioResponseFinished && !response.writableEnded) { stop('client_disconnect'); } }); request.on('close', cleanup); response.on('close', cleanup); maybeStart(); }, attachFrames(nextWebsocket) { if (closed) { nextWebsocket.close(1011, 'Playback closed'); return; } if (websocket && websocket.readyState === WebSocket.OPEN) { nextWebsocket.close(1013, 'Frame stream already attached for this session'); return; } websocket = nextWebsocket; sendJson(websocket, { type: 'ready', codec: 'jpeg', fps, quality, width, duration: session.duration, seekable: isSessionSeekable(session), seekSeconds: session.seekSeconds, }); websocket.on('close', () => { if (!closed && !serverClosingWebSocket) { stop('client_disconnect'); } }); websocket.on('error', () => { if (!closed) { stop('websocket_error'); } }); maybeStart(); }, }; readyTimer = setTimeout(() => { if (!started && !closed) { stop('consumer_attach_timeout'); } }, PLAYBACK_READY_TIMEOUT_MS).unref(); return playback; function maybeStart() { if (started || closed || !audioResponse || !isWebSocketOpen(websocket)) { return; } started = true; clearReadyTimer(); audioFfmpeg = spawnFfmpeg(buildRelayAudioArgs(session), ['pipe', 'pipe', 'pipe']); frameFfmpeg = spawnFfmpeg(buildRelayFrameArgs(session), ['pipe', 'pipe', 'pipe']); audioLogger = createFfmpegLogger('relay-audio', session.id, audioFfmpeg.pid); frameLogger = createFfmpegLogger('relay-frames', session.id, frameFfmpeg.pid); audioInput = createRelayInputBranch(audioFfmpeg.stdin, handleRelayInputError); frameInput = createRelayInputBranch(frameFfmpeg.stdin, handleRelayInputError); audioLogger.start(); frameLogger.start(); audioFfmpeg.stdout.on('error', (error) => { logWarn(`audio output error kind=${label} error=${oneLine(error.message)}`); stop('audio_output_error'); }); audioFfmpeg.stdout.pipe(audioResponse); frameFfmpeg.stdout.on('error', (error) => { logWarn(`frame output error kind=${label} error=${oneLine(error.message)}`); stop('frame_output_error'); }); frameFfmpeg.stdout.on('data', handleFrameData); audioFfmpeg.stderr.on('data', (chunk) => { audioStderr.write(chunk); audioLogger.stderr(chunk); }); frameFfmpeg.stderr.on('data', (chunk) => { frameStderr.write(chunk); frameLogger.stderr(chunk); }); audioFfmpeg.on('error', (error) => { audioLogger.error(error); stop('audio_start_error'); }); frameFfmpeg.on('error', (error) => { frameLogger.error(error); stop('frame_start_error'); }); audioFfmpeg.on('close', (code, signal) => { audioClosed = true; audioStderr.flush(); audioLogger.close(code, signal, stopReason, audioStderr.value); maybeFinishRelay(); }); frameFfmpeg.on('close', (code, signal) => { frameClosed = true; frameExitCode = code; frameExitSignal = signal; frameStderr.flush(); frameLogger.close(code, signal, stopReason, frameStderr.value); sendFrameEndIfNeeded(); maybeFinishRelay(); }); void startRelaySource(); } async function startRelaySource() { const startedAt = Date.now(); let upstreamStatus = 'pending'; let upstreamEnded = false; sourceController = new AbortController(); try { const headers = new Headers(); headers.set('accept-encoding', 'identity'); const upstream = await fetch(session.url, { headers, redirect: 'follow', signal: sourceController.signal, }); upstreamStatus = String(upstream.status); logInfo(`relay source connected kind=${label} status=${upstream.status} contentType=${upstream.headers.get('content-type') ?? 'unknown'}`); if (!upstream.ok || !upstream.body) { throw new Error(`Source returned HTTP ${upstream.status}`); } for await (const chunk of Readable.fromWeb(upstream.body)) { if (closed || stopping) { return; } const buffer = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk); sourceBytes += buffer.length; if (!audioInput.write(buffer) || !frameInput.write(buffer)) { stop('relay_input_queue_overflow'); return; } await waitForRelayCapacity([audioInput, frameInput], () => closed || stopping); } upstreamEnded = true; sourceEnded = true; audioInput.end(); frameInput.end(); } catch (error) { if (sourceController.signal.aborted || closed) { return; } logWarn(`relay source failed kind=${label} error=${oneLine(error.message)}`); stop('relay_source_error'); } finally { logInfo(`relay source closed kind=${label} status=${upstreamStatus} bytes=${sourceBytes} upstreamEnded=${upstreamEnded} durationMs=${Date.now() - startedAt}`); } } function handleRelayInputError(error) { if (!closed && !stopping) { logWarn(`relay input error kind=${label} error=${oneLine(error.message)}`); stop('relay_input_error'); } } function handleFrameData(chunk) { frameBuffer = Buffer.concat([frameBuffer, chunk]); for (;;) { const start = frameBuffer.indexOf(JPEG_SOI); if (start === -1) { frameBuffer = Buffer.alloc(0); return; } const end = frameBuffer.indexOf(JPEG_EOI, start + JPEG_SOI.length); if (end === -1) { frameBuffer = start === 0 ? frameBuffer : frameBuffer.subarray(start); return; } const jpeg = frameBuffer.subarray(start, end + JPEG_EOI.length); frameBuffer = frameBuffer.subarray(end + JPEG_EOI.length); const timestamp = frameIndex / fps; frameIndex += 1; if (!isWebSocketOpen(websocket)) { return; } const sendResult = sendFramePacket(websocket, timestamp, jpeg, (error) => { if (error && !closed) { stop('websocket_send_error'); } }); if (sendResult === 'closed') { return; } if (sendResult === 'skipped') { skippedFrames += 1; } } } function stop(reason) { if (closed) { return; } stopReason = reason; stopping = true; sourceController?.abort(); audioInput?.destroy(); frameInput?.destroy(); if (isChildRunning(audioFfmpeg)) { stopProcess(audioFfmpeg); } if (isChildRunning(frameFfmpeg)) { stopProcess(frameFfmpeg); } if (!isChildRunning(audioFfmpeg) && !isChildRunning(frameFfmpeg)) { finishRelay(1000, 'relay stopped'); } } function maybeFinishRelay() { if (closed) { return; } if (!stopping && !sourceEnded) { stop('relay_process_exit'); return; } if (audioClosed && frameClosed) { finishRelay(1000, 'relay ended'); } } function sendFrameEndIfNeeded() { if (frameEndSent || !isWebSocketOpen(websocket)) { return; } frameEndSent = true; sendJson(websocket, { type: 'end', code: frameExitCode, signal: frameExitSignal, skippedFrames, message: summarizeFfmpegExit(frameExitCode, frameExitSignal, frameStderr.value), }); serverClosingWebSocket = true; websocket.close(1000, 'ffmpeg exited'); } function finishRelay(websocketCode, websocketReason) { if (closed) { return; } closed = true; clearReadyTimer(); sourceController?.abort(); audioInput?.destroy(); frameInput?.destroy(); if (playbacks.get(session.id) === playback) { playbacks.delete(session.id); } if (audioResponse && !audioResponse.writableEnded) { audioResponse.end(); } if (isWebSocketOpen(websocket) && frameEndSent) { serverClosingWebSocket = true; websocket.close(websocketCode, websocketReason); } else { sendFrameEndIfNeeded(); } logInfo(`relay closed kind=${label} reason=${stopReason} sourceBytes=${sourceBytes} frames=${frameIndex} skippedFrames=${skippedFrames} audioInputPeakBytes=${audioInput?.peakBytes ?? 0} frameInputPeakBytes=${frameInput?.peakBytes ?? 0}`); } function clearReadyTimer() { if (readyTimer) { clearTimeout(readyTimer); readyTimer = null; } } } function createPlayback(session) { const { fps, quality, width } = session.options; const label = `playback:${shortId(session.id)}`; let audioResponse = null; let audioQueue = []; let audioQueueBytes = 0; let audioQueuePeakBytes = 0; let audioBackpressureCount = 0; let audioDrainCount = 0; let audioDrainPending = false; let websocket = null; let ffmpeg = null; let logger = null; let releaseSource = () => {}; let stderr = ''; let frameBuffer = Buffer.alloc(0); let frameIndex = 0; let skippedFrames = 0; let stopReason = 'process_exit'; let started = false; let closed = false; let readyTimer = null; const stderrTail = createFfmpegStderrTail(); const playback = { get closed() { return closed; }, stop(reason = 'playback_stopped') { stop(reason); }, attachAudio(request, response) { if (closed) { response.status(410).end(); return; } if (audioResponse && !audioResponse.writableEnded) { response.status(409).json({ error: 'Audio stream already attached for this session.' }); return; } audioResponse = response; response.set({ 'Cache-Control': 'no-store', 'Content-Type': 'audio/mpeg', 'X-Content-Type-Options': 'nosniff', }); response.flushHeaders(); const cleanup = once(() => { if (!closed) { stop('client_disconnect'); } }); request.on('close', cleanup); response.on('close', cleanup); maybeStart(); }, attachFrames(nextWebsocket) { if (closed) { nextWebsocket.close(1011, 'Playback closed'); return; } if (websocket && websocket.readyState === WebSocket.OPEN) { nextWebsocket.close(1013, 'Frame stream already attached for this session'); return; } websocket = nextWebsocket; sendJson(websocket, { type: 'ready', codec: 'jpeg', fps, quality, width, duration: session.duration, seekable: isSessionSeekable(session), seekSeconds: session.seekSeconds, }); websocket.on('close', () => { if (!closed) { stop('client_disconnect'); } }); websocket.on('error', () => { if (!closed) { stop('websocket_error'); } }); maybeStart(); }, }; readyTimer = setTimeout(() => { if (!started && !closed) { stop('consumer_attach_timeout'); } }, PLAYBACK_READY_TIMEOUT_MS).unref(); return playback; function maybeStart() { if (started || closed || !audioResponse || !isWebSocketOpen(websocket)) { return; } started = true; clearReadyTimer(); const source = createSourceInput(session.id, 'playback'); releaseSource = once(source.release); ffmpeg = spawnFfmpeg(buildPlaybackArgs(session, source.url), ['ignore', 'pipe', 'pipe', 'pipe']); logger = createFfmpegLogger('playback', session.id, ffmpeg.pid); logger.start(); const frameStream = ffmpeg.stdio[3]; ffmpeg.stdout.on('data', handleAudioData); ffmpeg.stdout.on('error', (error) => { logWarn(`audio output error kind=${label} error=${oneLine(error.message)}`); stop('audio_output_error'); }); frameStream.on('error', (error) => { logWarn(`frame output error kind=${label} error=${oneLine(error.message)}`); stop('frame_output_error'); }); frameStream.on('data', handleFrameData); ffmpeg.stderr.on('data', (chunk) => { stderrTail.write(chunk); logger.stderr(chunk); }); ffmpeg.on('error', (error) => { stopReason = 'start_error'; logger.error(error); finishPlayback(1011, `Failed to start ffmpeg: ${error.message}`, null, null); }); ffmpeg.on('close', (code, signal) => { stderrTail.flush(); stderr = stderrTail.value; logger.close(code, signal, stopReason, stderr); finishPlayback(1000, 'ffmpeg exited', code, signal); }); } function handleAudioData(chunk) { if (closed) { return; } if (!audioResponse || audioResponse.writableEnded) { stop('audio_response_closed'); return; } audioQueue.push(chunk); audioQueueBytes += chunk.length; audioQueuePeakBytes = Math.max(audioQueuePeakBytes, audioQueueBytes); if (audioQueueBytes > MAX_AUDIO_QUEUE_BYTES) { logWarn(`audio queue overflow kind=${label} bytes=${audioQueueBytes} maxBytes=${MAX_AUDIO_QUEUE_BYTES}`); stop('audio_queue_overflow'); return; } flushAudioQueue(); } function flushAudioQueue() { if (closed || !audioResponse || audioResponse.writableEnded) { return; } while (audioQueue.length > 0) { const chunk = audioQueue.shift(); audioQueueBytes -= chunk.length; let canContinue = false; try { canContinue = audioResponse.write(chunk); } catch (error) { logWarn(`audio response write failed kind=${label} error=${oneLine(error.message)}`); stop('audio_response_write_error'); return; } if (!canContinue) { audioBackpressureCount += 1; waitForAudioDrain(); return; } } } function waitForAudioDrain() { if (audioDrainPending || !audioResponse || audioResponse.writableEnded) { return; } audioDrainPending = true; audioResponse.once('drain', () => { audioDrainPending = false; audioDrainCount += 1; flushAudioQueue(); }); } function handleFrameData(chunk) { frameBuffer = Buffer.concat([frameBuffer, chunk]); for (;;) { const start = frameBuffer.indexOf(JPEG_SOI); if (start === -1) { frameBuffer = Buffer.alloc(0); return; } const end = frameBuffer.indexOf(JPEG_EOI, start + JPEG_SOI.length); if (end === -1) { frameBuffer = start === 0 ? frameBuffer : frameBuffer.subarray(start); return; } const jpeg = frameBuffer.subarray(start, end + JPEG_EOI.length); frameBuffer = frameBuffer.subarray(end + JPEG_EOI.length); const timestamp = frameIndex / fps; frameIndex += 1; if (!isWebSocketOpen(websocket)) { return; } const sendResult = sendFramePacket(websocket, timestamp, jpeg, (error) => { if (error && !closed) { stop('websocket_send_error'); } }); if (sendResult === 'closed') { return; } if (sendResult === 'skipped') { skippedFrames += 1; } } } function stop(reason) { stopReason = reason; if (ffmpeg && ffmpeg.exitCode === null && ffmpeg.signalCode === null) { stopProcess(ffmpeg); return; } finishPlayback(1000, 'playback stopped', null, null); } function finishPlayback(websocketCode, websocketReason, ffmpegCode, ffmpegSignal) { if (closed) { return; } closed = true; clearReadyTimer(); releaseSource(); if (playbacks.get(session.id) === playback) { playbacks.delete(session.id); } audioQueue = []; audioQueueBytes = 0; if (audioResponse && !audioResponse.writableEnded) { audioResponse.end(); } if (isWebSocketOpen(websocket)) { sendJson(websocket, { type: 'end', code: ffmpegCode, signal: ffmpegSignal, skippedFrames, message: summarizeFfmpegExit(ffmpegCode, ffmpegSignal, stderr), }); websocket.close(websocketCode, websocketReason); } logInfo(`playback closed kind=${label} reason=${stopReason} frames=${frameIndex} skippedFrames=${skippedFrames} audioQueuePeakBytes=${audioQueuePeakBytes} audioBackpressureCount=${audioBackpressureCount} audioDrainCount=${audioDrainCount}`); } function clearReadyTimer() { if (readyTimer) { clearTimeout(readyTimer); readyTimer = null; } } } function isWebSocketOpen(websocket) { return websocket?.readyState === WebSocket.OPEN; } function sendFramePacket(websocket, timestamp, jpeg, onError) { if (!isWebSocketOpen(websocket)) { return 'closed'; } const packetBytes = 8 + jpeg.length; if (websocket.bufferedAmount > 0 && websocket.bufferedAmount + packetBytes > MAX_WS_BUFFER_BYTES) { return 'skipped'; } const packet = Buffer.allocUnsafe(packetBytes); packet.writeDoubleLE(timestamp, 0); jpeg.copy(packet, 8); websocket.send(packet, { binary: true }, (error) => { if (error) { onError(error); } }); return 'sent'; } function createSourceInput(sessionId, kind) { const token = randomUUID(); sourceTokens.set(token, { sessionId, kind, createdAt: Date.now() }); return { url: `http://127.0.0.1:${getListeningPort()}/_source/${token}`, release: () => sourceTokens.delete(token), }; } function getListeningPort() { const address = server.address(); return typeof address === 'object' && address ? address.port : PORT; } function createAudioWorker(session) { const source = createSourceInput(session.id, 'audio'); return { args: buildAudioArgs(session, source.url), release: source.release, }; } function createFrameWorker(session) { const source = createSourceInput(session.id, 'frames'); return { args: buildFrameArgs(session, source.url), release: source.release, }; } function buildRelayAudioArgs(session) { return buildAudioArgs(session, 'pipe:0', { seekable: false, startTime: 0 }); } function buildRelayFrameArgs(session) { return buildFrameArgs(session, 'pipe:0', { seekable: false, startTime: 0 }); } function buildPlaybackArgs(session, inputUrl) { return [ ...buildInputArgs(inputUrl, { startTime: session.seekSeconds }), '-map', '0:a:0?', '-vn', '-ac', '2', '-ar', '48000', '-codec:a', 'libmp3lame', '-b:a', session.options.audioBitrate, '-f', 'mp3', 'pipe:1', '-map', '0:v:0', ...buildFrameOutputArgs(session, 'pipe:3'), ]; } function buildAudioArgs(session, inputUrl, inputOptions) { return [ ...buildInputArgs(inputUrl, { ...inputOptions, startTime: inputOptions?.startTime ?? session.seekSeconds }), '-map', '0:a:0?', '-vn', '-ac', '2', '-ar', '48000', '-codec:a', 'libmp3lame', '-b:a', session.options.audioBitrate, '-f', 'mp3', 'pipe:1', ]; } function buildFrameArgs(session, inputUrl, inputOptions) { return [ ...buildInputArgs(inputUrl, { ...inputOptions, startTime: inputOptions?.startTime ?? session.seekSeconds }), '-map', '0:v:0', ...buildFrameOutputArgs(session, 'pipe:1'), ]; } function buildInputArgs(inputUrl, { seekable = true, startTime = 0 } = {}) { const args = [ '-hide_banner', '-nostdin', '-loglevel', FFMPEG_LOG_LEVEL, '-nostats', ]; if (seekable) { args.push('-seekable', startTime > 0 ? '1' : FFMPEG_INPUT_SEEKABLE); } if (startTime > 0) { args.push('-ss', formatFfmpegSeconds(startTime)); } args.push('-re', '-i', inputUrl); return args; } function formatFfmpegSeconds(seconds) { return clampNumber(seconds, 0, Number.MAX_SAFE_INTEGER).toFixed(3); } function buildFrameOutputArgs(session, outputUrl) { const { fps, quality, width } = session.options; const videoFilter = `fps=${fps},scale=w='min(${width},iw)':h=-2:flags=bicubic:out_range=pc,format=yuvj420p`; return [ '-an', '-vf', videoFilter, '-codec:v', 'mjpeg', '-pix_fmt', 'yuvj420p', '-color_range', 'pc', '-q:v', String(quality), '-f', 'image2pipe', outputUrl, ]; } function spawnFfmpeg(args, stdio = ['ignore', 'pipe', 'pipe']) { return spawn(FFMPEG_PATH, args, { stdio, }); } function createFfmpegLogger(kind, sessionId, pid) { const startedAt = Date.now(); const label = `${kind}:${shortId(sessionId)}`; const lineLogger = createLineLogger((line) => { if (shouldSuppressFfmpegLogLine(line)) { return; } logWarn(`ffmpeg stderr kind=${label} pid=${pid ?? 'unknown'} line="${oneLine(redactSecrets(line))}"`); }); return { start() { logInfo(`ffmpeg started kind=${label} pid=${pid ?? 'unknown'} loglevel=${FFMPEG_LOG_LEVEL}`); }, stderr(chunk) { lineLogger.write(chunk); }, error(error) { logWarn(`ffmpeg start error kind=${label} pid=${pid ?? 'unknown'} error=${oneLine(error.message)}`); }, close(code, signal, reason, stderr) { lineLogger.flush(); const level = reason === 'client_disconnect' || code === 0 || signal === 'SIGTERM' ? 'info' : 'warn'; const tail = redactSecrets(stderr).trim(); const tailText = tail ? ` stderrTail="${oneLine(tail)}"` : ''; log(level, `ffmpeg exited kind=${label} pid=${pid ?? 'unknown'} code=${code ?? 'null'} signal=${signal ?? 'none'} reason=${reason} durationMs=${Date.now() - startedAt}${tailText}`); }, }; } function shouldSuppressFfmpegLogLine(line) { return line.includes('deprecated pixel format used, make sure you did set range correctly'); } function createFfmpegStderrTail() { let value = ''; const lineLogger = createLineLogger((line) => { if (!shouldSuppressFfmpegLogLine(line)) { value = appendTail(value, `${line}\n`); } }); return { get value() { return value; }, write(chunk) { lineLogger.write(chunk); }, flush() { lineLogger.flush(); }, }; } function createRelayInputBranch(stream, onError) { let accepting = true; let ending = false; let drainPending = false; let streamPendingBytes = 0; let queue = []; let queueBytes = 0; let peakBytes = 0; let waiters = []; stream.on('drain', () => { drainPending = false; streamPendingBytes = 0; updatePeakBytes(); notifyWaiters(); flush(); }); stream.on('error', (error) => { accepting = false; streamPendingBytes = 0; notifyWaiters(); onError(error); }); stream.on('close', () => { accepting = false; streamPendingBytes = 0; notifyWaiters(); }); return { get queueBytes() { return getPendingBytes(); }, get peakBytes() { return peakBytes; }, write(chunk) { if (!accepting || stream.destroyed) { return false; } if (queue.length === 0 && !drainPending) { return writeNow(chunk); } queue.push(chunk); queueBytes += chunk.length; updatePeakBytes(); notifyWaiters(); return getPendingBytes() <= MAX_RELAY_BRANCH_QUEUE_BYTES; }, end() { accepting = false; ending = true; flush(); }, destroy() { accepting = false; ending = false; queue = []; queueBytes = 0; streamPendingBytes = 0; notifyWaiters(); if (!stream.destroyed) { stream.destroy(); } }, waitForCapacity() { if (getPendingBytes() <= RELAY_BRANCH_PAUSE_BYTES || !accepting || stream.destroyed) { return Promise.resolve(); } return new Promise((resolve) => { waiters.push(resolve); }); }, }; function writeNow(chunk) { try { if (!stream.write(chunk)) { drainPending = true; streamPendingBytes += chunk.length; updatePeakBytes(); } notifyWaiters(); return true; } catch (error) { accepting = false; streamPendingBytes = 0; notifyWaiters(); onError(error); return false; } } function flush() { while (queue.length > 0 && !drainPending && !stream.destroyed) { const chunk = queue.shift(); queueBytes -= chunk.length; if (!writeNow(chunk)) { return; } notifyWaiters(); } if (ending && queue.length === 0 && !drainPending && !stream.destroyed) { stream.end(); } } function notifyWaiters() { if (getPendingBytes() > RELAY_BRANCH_PAUSE_BYTES && accepting && !stream.destroyed) { return; } const currentWaiters = waiters; waiters = []; for (const resolve of currentWaiters) { resolve(); } } function getPendingBytes() { return queueBytes + streamPendingBytes; } function updatePeakBytes() { peakBytes = Math.max(peakBytes, getPendingBytes()); } } async function waitForRelayCapacity(branches, shouldStop) { while (!shouldStop()) { const blockedBranches = branches.filter((branch) => branch.queueBytes > RELAY_BRANCH_PAUSE_BYTES); if (blockedBranches.length === 0) { return; } await Promise.race(blockedBranches.map((branch) => branch.waitForCapacity())); } } function createLineLogger(callback) { let buffer = ''; return { write(chunk) { buffer += chunk.toString('utf8'); const lines = buffer.split(/\r?\n/); buffer = lines.pop() ?? ''; for (const line of lines) { if (line.trim()) { callback(line); } } }, flush() { if (buffer.trim()) { callback(buffer); } buffer = ''; }, }; } function copyUpstreamHeaders(upstreamHeaders, response) { for (const header of [ 'accept-ranges', 'content-length', 'content-range', 'content-type', 'etag', 'last-modified', ]) { const value = upstreamHeaders.get(header); if (value) { response.setHeader(header, value); } } response.setHeader('cache-control', 'no-store'); } function sendJson(websocket, payload) { if (websocket.readyState === WebSocket.OPEN) { websocket.send(JSON.stringify(payload)); } } function appendTail(current, chunk) { const next = current + chunk.toString('utf8'); return next.length > 4000 ? next.slice(-4000) : next; } function shortId(id) { return id.slice(0, 8); } function oneLine(value) { return String(value).replace(/\s+/g, ' ').trim().slice(0, 1200); } function logInfo(message) { log('info', message); } function logWarn(message) { log('warn', message); } function log(level, message) { const output = `${new Date().toISOString()} ${message}`; if (level === 'warn') { console.warn(output); return; } console.log(output); } function summarizeFfmpegExit(code, signal, stderr) { if (signal) { return `ffmpeg stopped by ${signal}.`; } if (code === 0 || code === null) { return 'Frame stream ended.'; } const detail = redactSecrets(stderr).trim(); return detail ? `ffmpeg exited with code ${code}: ${detail}` : `ffmpeg exited with code ${code}.`; } function redactSecrets(text) { return text.replace(/([?&](?:api_key|apikey|access_token|token|key)=)[^&\s]+/gi, '$1[redacted]'); } function isChildRunning(child) { return Boolean(child && child.exitCode === null && child.signalCode === null); } function stopProcess(child) { if (!child || child.exitCode !== null || child.signalCode !== null) { return; } child.kill('SIGTERM'); setTimeout(() => { if (child.exitCode === null && child.signalCode === null) { child.kill('SIGKILL'); } }, 1500).unref(); } function once(callback) { let called = false; return (...args) => { if (called) { return; } called = true; callback(...args); }; }