import { spawn } from 'node:child_process'; import { randomUUID } from 'node:crypto'; import { createServer } from 'node:http'; import path from 'node:path'; import { Readable } from 'node:stream'; import { fileURLToPath } from 'node:url'; import express from 'express'; import { WebSocket, WebSocketServer } from 'ws'; const __dirname = path.dirname(fileURLToPath(import.meta.url)); const publicDir = path.join(__dirname, '..', 'public'); const app = express(); const server = createServer(app); const wss = new WebSocketServer({ noServer: true }); const PORT = Number(process.env.PORT ?? 3000); const FFMPEG_PATH = process.env.FFMPEG_PATH ?? 'ffmpeg'; const SESSION_TTL_MS = 60 * 60 * 1000; const MAX_WS_BUFFER_BYTES = 12 * 1024 * 1024; const JPEG_SOI = Buffer.from([0xff, 0xd8]); const JPEG_EOI = Buffer.from([0xff, 0xd9]); const defaults = { fps: 24, width: 960, quality: 5, audioBitrate: '160k', }; const sessions = new Map(); const sourceTokens = new Map(); app.disable('x-powered-by'); app.use(express.json({ limit: '32kb' })); app.use(express.static(publicDir)); app.get('/api/health', (_request, response) => { response.json({ ok: true, ffmpeg: FFMPEG_PATH }); }); app.post('/api/session', (request, response) => { let url; try { url = parseStreamUrl(request.body?.url); } catch (error) { response.status(400).json({ error: error.message }); return; } const options = parsePlaybackOptions(request.body); const id = randomUUID(); sessions.set(id, { id, url, options, createdAt: Date.now(), lastUsedAt: Date.now(), }); response.status(201).json({ id, options }); }); app.all('/_source/:token', async (request, response) => { const source = sourceTokens.get(request.params.token); const session = source ? getSession(source.sessionId) : null; if (!session) { response.status(404).end(); return; } const controller = new AbortController(); const cleanup = once(() => controller.abort()); response.on('close', cleanup); try { const headers = new Headers(); const range = request.get('range'); if (range) { headers.set('range', range); } headers.set('accept-encoding', 'identity'); const upstream = await fetch(session.url, { method: request.method === 'HEAD' ? 'HEAD' : 'GET', headers, redirect: 'follow', signal: controller.signal, }); response.status(upstream.status); copyUpstreamHeaders(upstream.headers, response); if (request.method === 'HEAD' || !upstream.body) { response.end(); return; } Readable.fromWeb(upstream.body).on('error', (error) => { if (!response.destroyed) { response.destroy(error); } }).pipe(response); } catch (error) { if (controller.signal.aborted) { return; } console.error(`Source proxy failed: ${error.message}`); if (!response.headersSent) { response.status(502).json({ error: 'Failed to fetch source stream.' }); } else { response.destroy(error); } } }); app.get('/audio/:sessionId', (request, response) => { const session = getSession(request.params.sessionId); if (!session) { response.status(404).json({ error: 'Unknown or expired session.' }); return; } const worker = createAudioWorker(session); const releaseWorker = once(worker.release); const ffmpeg = spawnFfmpeg(worker.args); let stderr = ''; response.set({ 'Cache-Control': 'no-store', 'Content-Type': 'audio/mpeg', 'X-Content-Type-Options': 'nosniff', }); response.flushHeaders(); ffmpeg.stderr.on('data', (chunk) => { stderr = appendTail(stderr, chunk); }); ffmpeg.stdout.pipe(response); ffmpeg.on('error', (error) => { releaseWorker(); console.error(`Failed to start ffmpeg audio worker: ${error.message}`); if (!response.writableEnded) { response.end(); } }); ffmpeg.on('close', (code, signal) => { releaseWorker(); if (code && code !== 255) { console.warn(`ffmpeg audio worker exited with code ${code}: ${redactSecrets(stderr)}`); } if (signal) { console.warn(`ffmpeg audio worker stopped by ${signal}`); } if (!response.writableEnded) { response.end(); } }); const cleanup = once(() => stopProcess(ffmpeg)); request.on('close', cleanup); response.on('close', cleanup); }); server.on('upgrade', (request, socket, head) => { const host = request.headers.host ?? 'localhost'; const { pathname } = new URL(request.url ?? '/', `http://${host}`); const match = pathname.match(/^\/frames\/([0-9a-f-]+)$/i); if (!match || !getSession(match[1])) { socket.write('HTTP/1.1 404 Not Found\r\n\r\n'); socket.destroy(); return; } wss.handleUpgrade(request, socket, head, (websocket) => { wss.emit('connection', websocket, request, match[1]); }); }); wss.on('connection', (websocket, _request, sessionId) => { const session = getSession(sessionId); if (!session) { websocket.close(1008, 'Unknown session'); return; } const worker = createFrameWorker(session); const releaseWorker = once(worker.release); const ffmpeg = spawnFfmpeg(worker.args); const { fps, quality, width } = session.options; let buffer = Buffer.alloc(0); let frameIndex = 0; let skippedFrames = 0; let stderr = ''; let closedByClient = false; sendJson(websocket, { type: 'ready', codec: 'jpeg', fps, quality, width, }); ffmpeg.stdout.on('data', (chunk) => { buffer = Buffer.concat([buffer, chunk]); for (;;) { const start = buffer.indexOf(JPEG_SOI); if (start === -1) { buffer = Buffer.alloc(0); return; } const end = buffer.indexOf(JPEG_EOI, start + JPEG_SOI.length); if (end === -1) { buffer = start === 0 ? buffer : buffer.subarray(start); return; } const jpeg = buffer.subarray(start, end + JPEG_EOI.length); buffer = buffer.subarray(end + JPEG_EOI.length); const timestamp = frameIndex / fps; frameIndex += 1; if (websocket.readyState !== WebSocket.OPEN) { return; } if (websocket.bufferedAmount > MAX_WS_BUFFER_BYTES) { skippedFrames += 1; continue; } const packet = Buffer.allocUnsafe(8 + jpeg.length); packet.writeDoubleLE(timestamp, 0); jpeg.copy(packet, 8); websocket.send(packet, { binary: true }); } }); ffmpeg.stderr.on('data', (chunk) => { stderr = appendTail(stderr, chunk); }); ffmpeg.on('error', (error) => { releaseWorker(); sendJson(websocket, { type: 'error', message: `Failed to start ffmpeg: ${error.message}`, }); websocket.close(1011, 'ffmpeg start failed'); }); ffmpeg.on('close', (code, signal) => { releaseWorker(); if (websocket.readyState === WebSocket.OPEN && !closedByClient) { sendJson(websocket, { type: 'end', code, signal, skippedFrames, message: summarizeFfmpegExit(code, signal, stderr), }); websocket.close(1000, 'ffmpeg exited'); } }); websocket.on('close', () => { closedByClient = true; stopProcess(ffmpeg); }); websocket.on('error', () => { closedByClient = true; stopProcess(ffmpeg); }); }); setInterval(() => { const now = Date.now(); for (const [id, session] of sessions) { if (now - session.lastUsedAt > SESSION_TTL_MS) { sessions.delete(id); } } for (const [token, source] of sourceTokens) { if (now - source.createdAt > SESSION_TTL_MS) { sourceTokens.delete(token); } } }, 60 * 1000).unref(); server.listen(PORT, () => { console.log(`Frame stream app listening at http://localhost:${PORT}`); }); function parseStreamUrl(value) { if (typeof value !== 'string' || value.trim().length === 0) { throw new Error('A stream URL is required.'); } if (value.length > 4096) { throw new Error('The stream URL is too long.'); } const parsed = new URL(value.trim()); if (!['http:', 'https:'].includes(parsed.protocol)) { throw new Error('Only http and https stream URLs are supported.'); } return parsed.toString(); } function parsePlaybackOptions(body) { return { fps: clampInteger(body?.fps, defaults.fps, 1, 30), width: clampInteger(body?.width, defaults.width, 160, 1920), quality: clampInteger(body?.quality, defaults.quality, 2, 18), audioBitrate: parseAudioBitrate(body?.audioBitrate), }; } function clampInteger(value, fallback, min, max) { const parsed = Number(value); if (!Number.isFinite(parsed)) { return fallback; } return Math.min(max, Math.max(min, Math.round(parsed))); } function parseAudioBitrate(value) { if (typeof value !== 'string') { return defaults.audioBitrate; } return /^\d{2,3}k$/i.test(value) ? value.toLowerCase() : defaults.audioBitrate; } function getSession(sessionId) { const session = sessions.get(sessionId); if (!session) { return null; } session.lastUsedAt = Date.now(); return session; } function createAudioWorker(session) { const source = createSourceInput(session.id); return { args: buildAudioArgs(session, source.url), release: source.release, }; } function createFrameWorker(session) { const source = createSourceInput(session.id); return { args: buildFrameArgs(session, source.url), release: source.release, }; } function createSourceInput(sessionId) { const token = randomUUID(); sourceTokens.set(token, { sessionId, createdAt: Date.now() }); return { url: `http://127.0.0.1:${getListeningPort()}/_source/${token}`, release: () => sourceTokens.delete(token), }; } function getListeningPort() { const address = server.address(); return typeof address === 'object' && address ? address.port : PORT; } function buildAudioArgs(session, inputUrl) { return [ '-hide_banner', '-nostdin', '-loglevel', 'warning', '-re', '-i', inputUrl, '-vn', '-map', '0:a:0?', '-ac', '2', '-ar', '48000', '-codec:a', 'libmp3lame', '-b:a', session.options.audioBitrate, '-f', 'mp3', 'pipe:1', ]; } function buildFrameArgs(session, inputUrl) { const { fps, quality, width } = session.options; const videoFilter = `fps=${fps},scale='min(${width},iw)':-2:flags=bicubic`; return [ '-hide_banner', '-nostdin', '-loglevel', 'warning', '-re', '-i', inputUrl, '-an', '-vf', videoFilter, '-codec:v', 'mjpeg', '-q:v', String(quality), '-f', 'image2pipe', 'pipe:1', ]; } function spawnFfmpeg(args) { return spawn(FFMPEG_PATH, args, { stdio: ['ignore', 'pipe', 'pipe'], }); } function copyUpstreamHeaders(upstreamHeaders, response) { for (const header of [ 'accept-ranges', 'content-length', 'content-range', 'content-type', 'etag', 'last-modified', ]) { const value = upstreamHeaders.get(header); if (value) { response.setHeader(header, value); } } response.setHeader('cache-control', 'no-store'); } function sendJson(websocket, payload) { if (websocket.readyState === WebSocket.OPEN) { websocket.send(JSON.stringify(payload)); } } function appendTail(current, chunk) { const next = current + chunk.toString('utf8'); return next.length > 4000 ? next.slice(-4000) : next; } function summarizeFfmpegExit(code, signal, stderr) { if (signal) { return `ffmpeg stopped by ${signal}.`; } if (code === 0 || code === null) { return 'Frame stream ended.'; } const detail = redactSecrets(stderr).trim(); return detail ? `ffmpeg exited with code ${code}: ${detail}` : `ffmpeg exited with code ${code}.`; } function redactSecrets(text) { return text.replace(/([?&](?:api_key|apikey|access_token|token|key)=)[^&\s]+/gi, '$1[redacted]'); } function stopProcess(child) { if (!child || child.exitCode !== null || child.signalCode !== null) { return; } child.kill('SIGTERM'); setTimeout(() => { if (child.exitCode === null && child.signalCode === null) { child.kill('SIGKILL'); } }, 1500).unref(); } function once(callback) { let called = false; return (...args) => { if (called) { return; } called = true; callback(...args); }; }