import { spawn } from 'node:child_process'; import { randomUUID } from 'node:crypto'; import fs from 'node:fs/promises'; import { createServer } from 'node:http'; import path from 'node:path'; import { Readable } from 'node:stream'; import { fileURLToPath } from 'node:url'; import express from 'express'; import { WebSocket, WebSocketServer } from 'ws'; const __dirname = path.dirname(fileURLToPath(import.meta.url)); const publicDir = path.join(__dirname, '..', 'public'); const app = express(); const server = createServer(app); const wss = new WebSocketServer({ noServer: true, perMessageDeflate: false }); const PORT = Number(process.env.PORT ?? 3000); const FFMPEG_PATH = process.env.FFMPEG_PATH ?? 'ffmpeg'; const FFPROBE_PATH = process.env.FFPROBE_PATH ?? 'ffprobe'; const FFMPEG_LOG_LEVEL = process.env.FFMPEG_LOG_LEVEL ?? 'warning'; const FFMPEG_INPUT_SEEKABLE = process.env.FFMPEG_INPUT_SEEKABLE ?? '0'; const FFMPEG_HTTP_RECONNECT = parseBoolean(process.env.FFMPEG_HTTP_RECONNECT, true); const FFMPEG_HTTP_RECONNECT_DELAY_MAX = clampInteger(process.env.FFMPEG_HTTP_RECONNECT_DELAY_MAX, 2, 0, 30); const FFMPEG_HTTP_RECONNECT_MAX_RETRIES = clampInteger(process.env.FFMPEG_HTTP_RECONNECT_MAX_RETRIES, 4, 0, 20); const FFMPEG_HTTP_RECONNECT_ON_HTTP_ERROR = process.env.FFMPEG_HTTP_RECONNECT_ON_HTTP_ERROR ?? '5xx'; const PLAYBACK_CONNECTION_MODE = parsePlaybackConnectionMode(process.env.PLAYBACK_CONNECTION_MODE ?? process.env.PLAYBACK_MODE); const METADATA_PROBE_ENABLED = parseBoolean(process.env.METADATA_PROBE_ENABLED, PLAYBACK_CONNECTION_MODE !== 'relay'); const METADATA_PROBE_TIMEOUT_MS = clampInteger(process.env.METADATA_PROBE_TIMEOUT_MS, 4 * 1000, 1000, 30 * 1000); const RECENT_URLS_PATH = process.env.RECENT_URLS_PATH ?? path.join(__dirname, '..', 'data', 'recent-urls.json'); const RECENT_URL_LIMIT = clampInteger(process.env.RECENT_URL_LIMIT, 12, 1, 50); const FAVORITES_PATH = process.env.FAVORITES_PATH ?? path.join(__dirname, '..', 'data', 'favorites.json'); const FAVORITES_LIMIT = clampInteger(process.env.FAVORITES_LIMIT, 50, 1, 200); const SESSION_TTL_MS = 60 * 60 * 1000; const PLAYBACK_READY_TIMEOUT_MS = 15 * 1000; const CLIENT_CLOCK_FRAME_LATE_GRACE_SECONDS = 0.25; const FRAME_CONDITION_LOG_INTERVAL_MS = 5000; const MAX_WS_BUFFER_BYTES = clampInteger(process.env.MAX_WS_BUFFER_BYTES, 2 * 1024 * 1024, 128 * 1024, 64 * 1024 * 1024); const MAX_AUDIO_QUEUE_BYTES = clampInteger(process.env.MAX_AUDIO_QUEUE_BYTES, 4 * 1024 * 1024, 256 * 1024, 128 * 1024 * 1024); const MAX_RELAY_BRANCH_QUEUE_BYTES = clampInteger(process.env.MAX_RELAY_BRANCH_QUEUE_BYTES, 8 * 1024 * 1024, 512 * 1024, 256 * 1024 * 1024); const RELAY_BRANCH_PAUSE_BYTES = Math.floor(MAX_RELAY_BRANCH_QUEUE_BYTES / 2); const JPEG_SOI = Buffer.from([0xff, 0xd8]); const JPEG_EOI = Buffer.from([0xff, 0xd9]); const defaults = { fps: clampInteger(process.env.DEFAULT_FPS, 24, 1, 30), width: clampInteger(process.env.DEFAULT_FRAME_WIDTH, 960, 160, 1920), quality: clampInteger(process.env.JPEG_QUALITY, 7, 2, 18), audioBitrate: parseAudioBitrate(process.env.DEFAULT_AUDIO_BITRATE, '160k'), audioChannels: clampInteger(process.env.DEFAULT_AUDIO_CHANNELS, 2, 1, 2), audioSampleRate: clampInteger(process.env.DEFAULT_AUDIO_SAMPLE_RATE, 48000, 22050, 48000), }; const sessions = new Map(); const sourceTokens = new Map(); const playbacks = new Map(); let recentUrls = []; let recentWrite = Promise.resolve(); let favorites = []; let favoritesWrite = Promise.resolve(); app.disable('x-powered-by'); app.use(express.json({ limit: '256kb' })); app.use(express.static(publicDir)); app.get('/api/health', (_request, response) => { response.json({ ok: true, ffmpeg: FFMPEG_PATH, playbackConnectionMode: PLAYBACK_CONNECTION_MODE }); }); app.get('/api/recent-urls', (_request, response) => { response.json({ urls: recentUrls.map((item) => ({ url: item.url, displayUrl: redactSecrets(item.url), lastPlayedAt: item.lastPlayedAt, })), }); }); app.get('/api/favorites', (_request, response) => { response.json({ favorites: formatFavoritesPayload() }); }); app.put('/api/favorites', async (request, response) => { let nextFavorites; try { nextFavorites = parseFavoritesPayload(request.body?.favorites); } catch (error) { response.status(400).json({ error: error.message }); return; } favorites = nextFavorites; favoritesWrite = favoritesWrite.catch(() => {}).then(() => saveFavorites()); try { await favoritesWrite; } catch (error) { console.warn(`Failed to store favorites: ${error.message}`); response.status(500).json({ error: 'Failed to store favorites.' }); return; } response.json({ favorites: formatFavoritesPayload() }); }); app.post('/api/session', async (request, response) => { let url; try { url = parseStreamUrl(request.body?.url); } catch (error) { response.status(400).json({ error: error.message }); return; } const options = parsePlaybackOptions(request.body); const id = randomUUID(); const metadataStatus = METADATA_PROBE_ENABLED ? 'pending' : 'disabled'; sessions.set(id, { id, url, options, duration: null, metadataStatus, seekSeconds: 0, seekGeneration: 0, createdAt: Date.now(), lastUsedAt: Date.now(), }); logInfo(`session created id=${shortId(id)} mode=${getSessionPlaybackConnectionMode(sessions.get(id))} fps=${options.fps} width=${options.width} quality=${options.quality}`); if (METADATA_PROBE_ENABLED) { startSessionMetadataProbe(sessions.get(id)); } try { await addRecentUrl(url); } catch (error) { console.warn(`Failed to store recent URL: ${error.message}`); } response.status(201).json(formatSessionPayload(sessions.get(id))); }); app.get('/api/session/:sessionId', (request, response) => { const session = getSession(request.params.sessionId); if (!session) { response.status(404).json({ error: 'Unknown or expired session.' }); return; } response.json(formatSessionPayload(session)); }); app.post('/api/session/:sessionId/seek', (request, response) => { const session = getSession(request.params.sessionId); if (!session) { response.status(404).json({ error: 'Unknown or expired session.' }); return; } if (!isSessionSeekable(session)) { response.status(409).json({ error: 'This stream is not seekable.' }); return; } const requestedSeconds = Number(request.body?.time); if (!Number.isFinite(requestedSeconds)) { response.status(400).json({ error: 'A numeric seek time is required.' }); return; } const seekSeconds = clampNumber(requestedSeconds, 0, session.duration); const playback = playbacks.get(session.id); session.seekSeconds = seekSeconds; session.seekGeneration += 1; session.lastUsedAt = Date.now(); if (playback && !playback.closed) { playback.stop('client_seek'); if (playbacks.get(session.id) === playback) { playbacks.delete(session.id); } } logInfo(`session seeked id=${shortId(session.id)} time=${seekSeconds.toFixed(3)} duration=${session.duration.toFixed(3)}`); response.json(formatSessionPayload(session)); }); app.all('/_source/:token', async (request, response) => { const source = sourceTokens.get(request.params.token); const session = source ? getSession(source.sessionId) : null; const startedAt = Date.now(); const sourceLabel = source ? `${source.kind}:${shortId(source.sessionId)}` : `unknown:${request.params.token.slice(0, 8)}`; let bytes = 0; let upstreamEnded = false; let upstreamStatus = 'pending'; if (!session) { response.status(404).end(); return; } const controller = new AbortController(); const logClose = once(() => { logInfo(`source closed kind=${sourceLabel} status=${upstreamStatus} bytes=${bytes} upstreamEnded=${upstreamEnded} durationMs=${Date.now() - startedAt}`); }); const cleanup = once(() => { controller.abort(); logClose(); }); response.on('close', cleanup); try { const headers = new Headers(); const range = request.get('range'); if (range) { headers.set('range', range); } headers.set('accept-encoding', 'identity'); const upstream = await fetch(session.url, { method: request.method === 'HEAD' ? 'HEAD' : 'GET', headers, redirect: 'follow', signal: controller.signal, }); upstreamStatus = String(upstream.status); logInfo(`source connected kind=${sourceLabel} status=${upstream.status} contentType=${upstream.headers.get('content-type') ?? 'unknown'}`); response.status(upstream.status); copyUpstreamHeaders(upstream.headers, response); if (request.method === 'HEAD' || !upstream.body) { response.end(); return; } Readable.fromWeb(upstream.body).on('data', (chunk) => { bytes += chunk.length; }).on('end', () => { upstreamEnded = true; }).on('error', (error) => { if (!controller.signal.aborted) { logWarn(`source stream error kind=${sourceLabel} error=${oneLine(error.message)}`); } if (!response.destroyed) { response.destroy(error); } }).pipe(response); } catch (error) { if (controller.signal.aborted) { return; } logWarn(`source failed kind=${sourceLabel} error=${oneLine(error.message)}`); if (!response.headersSent) { response.status(502).json({ error: 'Failed to fetch source stream.' }); } else { response.destroy(error); } } }); app.get('/audio/:sessionId', (request, response) => { const session = getSession(request.params.sessionId); if (!session) { response.status(404).json({ error: 'Unknown or expired session.' }); return; } const playbackMode = getSessionPlaybackConnectionMode(session); if (playbackMode === 'single' || playbackMode === 'relay') { getOrCreatePlayback(session, playbackMode).attachAudio(request, response); return; } stopSharedPlaybackIfNeeded(session, playbackMode); streamSplitAudio(request, response, session); }); server.on('upgrade', (request, socket, head) => { const host = request.headers.host ?? 'localhost'; const { pathname } = new URL(request.url ?? '/', `http://${host}`); const match = pathname.match(/^\/frames\/([0-9a-f-]+)$/i); if (!match || !getSession(match[1])) { socket.write('HTTP/1.1 404 Not Found\r\n\r\n'); socket.destroy(); return; } wss.handleUpgrade(request, socket, head, (websocket) => { wss.emit('connection', websocket, request, match[1]); }); }); wss.on('connection', (websocket, _request, sessionId) => { const session = getSession(sessionId); if (!session) { websocket.close(1008, 'Unknown session'); return; } const playbackMode = getSessionPlaybackConnectionMode(session); if (playbackMode === 'single' || playbackMode === 'relay') { getOrCreatePlayback(session, playbackMode).attachFrames(websocket); return; } stopSharedPlaybackIfNeeded(session, playbackMode); streamSplitFrames(websocket, session); }); setInterval(() => { const now = Date.now(); for (const [id, session] of sessions) { if (now - session.lastUsedAt > SESSION_TTL_MS) { sessions.delete(id); } } for (const [token, source] of sourceTokens) { if (now - source.createdAt > SESSION_TTL_MS) { sourceTokens.delete(token); } } }, 60 * 1000).unref(); await Promise.all([ loadRecentUrls(), loadFavorites(), ]); server.listen(PORT, () => { console.log(`Frame stream app listening at http://localhost:${PORT} mode=${PLAYBACK_CONNECTION_MODE}`); }); function parseStreamUrl(value) { if (typeof value !== 'string' || value.trim().length === 0) { throw new Error('A stream URL is required.'); } if (value.length > 4096) { throw new Error('The stream URL is too long.'); } const parsed = new URL(value.trim()); if (!['http:', 'https:'].includes(parsed.protocol)) { throw new Error('Only http and https stream URLs are supported.'); } return parsed.toString(); } function parsePlaybackOptions(body) { return { fps: clampInteger(body?.fps, defaults.fps, 1, 30), width: clampInteger(body?.width, defaults.width, 160, 1920), quality: clampInteger(body?.quality, defaults.quality, 2, 18), audioBitrate: parseAudioBitrate(body?.audioBitrate, defaults.audioBitrate), audioChannels: clampInteger(body?.audioChannels, defaults.audioChannels, 1, 2), audioSampleRate: clampInteger(body?.audioSampleRate, defaults.audioSampleRate, 22050, 48000), }; } function parsePlaybackConnectionMode(value) { if (value === 'relay' || value === 'tee') { return 'relay'; } if (value === 'single' || value === 'unified') { return 'single'; } if (value === 'split' || value === 'separate' || value === undefined || value === '') { return 'split'; } console.warn(`Unknown PLAYBACK_CONNECTION_MODE "${value}", using split.`); return 'split'; } function clampInteger(value, fallback, min, max) { const parsed = Number(value); if (!Number.isFinite(parsed)) { return fallback; } return Math.min(max, Math.max(min, Math.round(parsed))); } function parseBoolean(value, fallback) { if (value === undefined || value === '') { return fallback; } if (['1', 'true', 'yes', 'on'].includes(String(value).toLowerCase())) { return true; } if (['0', 'false', 'no', 'off'].includes(String(value).toLowerCase())) { return false; } return fallback; } function clampNumber(value, min, max) { return Math.min(max, Math.max(min, value)); } function parseAudioBitrate(value, fallback = '64k') { if (typeof value !== 'string') { return fallback; } return /^\d{2,3}k$/i.test(value) ? value.toLowerCase() : fallback; } function formatSessionPayload(session) { return { id: session.id, options: session.options, duration: Number.isFinite(session.duration) ? session.duration : null, metadataStatus: session.metadataStatus, seekable: isSessionSeekable(session), seekSeconds: session.seekSeconds, seekGeneration: session.seekGeneration, }; } function isSessionSeekable(session) { return hasRecordedDuration(session); } function hasRecordedDuration(session) { return Number.isFinite(session.duration) && session.duration > 0; } function getSessionPlaybackConnectionMode(session) { if (PLAYBACK_CONNECTION_MODE === 'relay' && hasRecordedDuration(session)) { return 'split'; } return PLAYBACK_CONNECTION_MODE; } function startSessionMetadataProbe(session) { void probeSessionDuration(session).then((duration) => { const current = sessions.get(session.id); if (current !== session) { return; } session.duration = duration; session.metadataStatus = Number.isFinite(duration) ? 'ready' : 'unavailable'; }).catch((error) => { const current = sessions.get(session.id); if (current !== session) { return; } session.duration = null; session.metadataStatus = 'unavailable'; logWarn(`duration probe failed id=${shortId(session.id)} error=${oneLine(error.message)}`); }); } async function probeSessionDuration(session) { const source = createSourceInput(session.id, 'probe'); try { const output = await runFfprobe([ '-v', 'error', '-show_entries', 'format=duration', '-of', 'json', source.url, ]); const payload = JSON.parse(output); const duration = Number(payload?.format?.duration); return Number.isFinite(duration) && duration > 0 ? duration : null; } finally { source.release(); } } function runFfprobe(args) { return new Promise((resolve, reject) => { const ffprobe = spawn(FFPROBE_PATH, args, { stdio: ['ignore', 'pipe', 'pipe'], }); let stdout = ''; let stderr = ''; let timedOut = false; let settled = false; const finish = (callback) => { if (settled) { return; } settled = true; clearTimeout(timer); callback(); }; const timer = setTimeout(() => { timedOut = true; stopProcess(ffprobe); }, METADATA_PROBE_TIMEOUT_MS); timer.unref(); ffprobe.stdout.on('data', (chunk) => { stdout = appendTail(stdout, chunk); }); ffprobe.stderr.on('data', (chunk) => { stderr = appendTail(stderr, chunk); }); ffprobe.on('error', (error) => { finish(() => reject(error)); }); ffprobe.on('close', (code, signal) => { finish(() => { if (timedOut) { reject(new Error('ffprobe timed out.')); return; } if (code === 0) { resolve(stdout); return; } const detail = redactSecrets(stderr).trim(); reject(new Error(detail ? `ffprobe exited with code ${code}: ${detail}` : `ffprobe exited with code ${code ?? 'null'} signal ${signal ?? 'none'}.`)); }); }); }); } async function loadRecentUrls() { try { const raw = await fs.readFile(RECENT_URLS_PATH, 'utf8'); const parsed = JSON.parse(raw); const items = Array.isArray(parsed?.urls) ? parsed.urls : []; recentUrls = items .filter((item) => typeof item?.url === 'string' && typeof item?.lastPlayedAt === 'string') .slice(0, RECENT_URL_LIMIT); } catch (error) { if (error.code !== 'ENOENT') { console.warn(`Failed to read recent URLs: ${error.message}`); } recentUrls = []; } } async function loadFavorites() { try { const raw = await fs.readFile(FAVORITES_PATH, 'utf8'); const parsed = JSON.parse(raw); const items = Array.isArray(parsed?.favorites) ? parsed.favorites : []; favorites = items .map(normalizeFavorite) .filter(Boolean) .slice(0, FAVORITES_LIMIT); } catch (error) { if (error.code !== 'ENOENT') { console.warn(`Failed to read favorites: ${error.message}`); } favorites = []; } } async function addRecentUrl(url) { const lastPlayedAt = new Date().toISOString(); recentUrls = [ { url, lastPlayedAt }, ...recentUrls.filter((item) => item.url !== url), ].slice(0, RECENT_URL_LIMIT); recentWrite = recentWrite.catch(() => {}).then(() => saveRecentUrls()); await recentWrite; } async function saveRecentUrls() { const directory = path.dirname(RECENT_URLS_PATH); const temporaryPath = `${RECENT_URLS_PATH}.${process.pid}.tmp`; const payload = `${JSON.stringify({ urls: recentUrls }, null, 2)}\n`; await fs.mkdir(directory, { recursive: true }); await fs.writeFile(temporaryPath, payload, 'utf8'); await fs.rename(temporaryPath, RECENT_URLS_PATH); } async function saveFavorites() { const directory = path.dirname(FAVORITES_PATH); const temporaryPath = `${FAVORITES_PATH}.${process.pid}.tmp`; const payload = `${JSON.stringify({ favorites }, null, 2)}\n`; await fs.mkdir(directory, { recursive: true }); await fs.writeFile(temporaryPath, payload, 'utf8'); await fs.rename(temporaryPath, FAVORITES_PATH); } function formatFavoritesPayload() { return favorites.map((item) => ({ title: item.title, url: item.url, })); } function parseFavoritesPayload(value) { if (!Array.isArray(value)) { throw new Error('Favorites must be a list.'); } if (value.length > FAVORITES_LIMIT) { throw new Error(`Favorites are limited to ${FAVORITES_LIMIT} items.`); } return value.map((item, index) => parseFavorite(item, index)); } function parseFavorite(item, index) { if (!item || typeof item !== 'object') { throw new Error(`Favorite ${index + 1} is invalid.`); } const title = parseFavoriteTitle(item.title, index); let url; try { url = parseStreamUrl(item.url); } catch (error) { throw new Error(`Favorite ${index + 1}: ${error.message}`); } return { title, url }; } function normalizeFavorite(item) { if (!item || typeof item !== 'object') { return null; } try { return { title: parseFavoriteTitle(item.title, 0), url: parseStreamUrl(item.url), }; } catch { return null; } } function parseFavoriteTitle(value, index) { if (typeof value !== 'string') { throw new Error(`Favorite ${index + 1} needs a title.`); } const title = value.trim(); if (!title) { throw new Error(`Favorite ${index + 1} needs a title.`); } if (title.length > 120) { throw new Error(`Favorite ${index + 1} title is too long.`); } return title; } function getSession(sessionId) { const session = sessions.get(sessionId); if (!session) { return null; } session.lastUsedAt = Date.now(); return session; } function getOrCreatePlayback(session, playbackMode = getSessionPlaybackConnectionMode(session)) { const existing = playbacks.get(session.id); if (existing && !existing.closed && existing.mode === playbackMode) { return existing; } stopSharedPlaybackIfNeeded(session, playbackMode); const playback = playbackMode === 'relay' ? createRelayPlayback(session) : createPlayback(session); playbacks.set(session.id, playback); return playback; } function stopSharedPlaybackIfNeeded(session, playbackMode) { const existing = playbacks.get(session.id); if (!existing || existing.closed || existing.mode === playbackMode) { return; } existing.stop('playback_mode_changed'); if (playbacks.get(session.id) === existing) { playbacks.delete(session.id); } } function streamSplitAudio(request, response, session) { const worker = createAudioWorker(session); const releaseWorker = once(worker.release); const ffmpeg = spawnFfmpeg(worker.args); const logger = createFfmpegLogger('audio', session.id, ffmpeg.pid); const stderrTail = createFfmpegStderrTail(); let responseFinished = false; let stopReason = 'process_exit'; logger.start(); response.set({ 'Cache-Control': 'no-store', 'Content-Type': 'audio/mpeg', 'X-Content-Type-Options': 'nosniff', }); response.flushHeaders(); ffmpeg.stderr.on('data', (chunk) => { stderrTail.write(chunk); logger.stderr(chunk); }); ffmpeg.stdout.on('error', (error) => { logWarn(`audio output error kind=audio:${shortId(session.id)} error=${oneLine(error.message)}`); stopReason = 'audio_output_error'; stopProcess(ffmpeg); }); ffmpeg.stdout.pipe(response); ffmpeg.on('error', (error) => { stopReason = 'start_error'; releaseWorker(); logger.error(error); if (!response.writableEnded) { response.end(); } }); ffmpeg.on('close', (code, signal) => { stderrTail.flush(); releaseWorker(); logger.close(code, signal, stopReason, stderrTail.value); if (!response.writableEnded) { response.end(); } }); response.on('finish', () => { responseFinished = true; }); const cleanup = once(() => { if (responseFinished || response.writableEnded) { return; } stopReason = 'client_disconnect'; stopProcess(ffmpeg); }); request.on('close', cleanup); response.on('close', cleanup); } function streamSplitFrames(websocket, session) { const worker = createFrameWorker(session); const releaseWorker = once(worker.release); const ffmpeg = spawnFfmpeg(worker.args); const logger = createFfmpegLogger('frames', session.id, ffmpeg.pid); const stderrTail = createFfmpegStderrTail(); const { fps, quality, width } = session.options; const label = `frames:${shortId(session.id)}`; let frameIndex = 0; let skippedFrames = 0; let stopReason = 'process_exit'; let serverClosingWebSocket = false; let clientAudioTime = null; const frameConditions = createFrameConditionLogger(label); const frameSender = createLatestFrameSender(websocket, { onError: () => { stopReason = 'websocket_send_error'; stopProcess(ffmpeg); }, onSkip: () => { skippedFrames += 1; }, onQueue: (event) => { frameConditions.senderQueue(event); }, }); const frameParser = createJpegFrameParser(handleJpegFrame); logger.start(); sendJson(websocket, { type: 'ready', codec: 'jpeg', fps, quality, width, duration: session.duration, seekable: isSessionSeekable(session), seekSeconds: session.seekSeconds, }); ffmpeg.stdout.on('data', handleFrameData); ffmpeg.stdout.on('error', (error) => { logWarn(`frame output error kind=${label} error=${oneLine(error.message)}`); stopReason = 'frame_output_error'; stopProcess(ffmpeg); }); ffmpeg.stderr.on('data', (chunk) => { stderrTail.write(chunk); logger.stderr(chunk); }); ffmpeg.on('error', (error) => { stopReason = 'start_error'; releaseWorker(); logger.error(error); sendJson(websocket, { type: 'error', message: `Failed to start ffmpeg: ${error.message}`, }); websocket.close(1011, 'ffmpeg start failed'); }); ffmpeg.on('close', (code, signal) => { stderrTail.flush(); releaseWorker(); logger.close(code, signal, stopReason, stderrTail.value); if (isWebSocketOpen(websocket) && stopReason !== 'client_disconnect') { sendJson(websocket, { type: 'end', code, signal, skippedFrames, message: summarizeFfmpegExit(code, signal, stderrTail.value), }); serverClosingWebSocket = true; websocket.close(1000, 'ffmpeg exited'); } logInfo(`frames closed kind=${label} reason=${stopReason} frames=${frameIndex} skippedFrames=${skippedFrames}`); frameConditions.flush('close', true); }); websocket.on('close', () => { if (serverClosingWebSocket) { return; } stopReason = 'client_disconnect'; stopProcess(ffmpeg); }); websocket.on('error', () => { stopReason = 'websocket_error'; stopProcess(ffmpeg); }); websocket.on('message', (data, isBinary) => { clientAudioTime = handleFrameClientMessage(data, isBinary, label, clientAudioTime); }); function handleFrameData(chunk) { frameParser.write(chunk); } function handleJpegFrame(jpeg) { const timestamp = frameIndex / fps; frameIndex += 1; if (isFrameLateForClient(timestamp, clientAudioTime)) { skippedFrames += 1; frameConditions.clientClockSkip(timestamp, clientAudioTime); return true; } const sendResult = frameSender.send(timestamp, jpeg); if (sendResult === 'closed') { return false; } return true; } } function createRelayPlayback(session) { const { fps, quality, width } = session.options; const label = `relay:${shortId(session.id)}`; let audioResponse = null; let websocket = null; let audioFfmpeg = null; let frameFfmpeg = null; let audioInput = null; let frameInput = null; let audioLogger = null; let frameLogger = null; let sourceController = null; let sourceBytes = 0; let frameIndex = 0; let skippedFrames = 0; let stopReason = 'process_exit'; let started = false; let stopping = false; let closed = false; let sourceEnded = false; let audioClosed = false; let frameClosed = false; let frameExitCode = null; let frameExitSignal = null; let frameEndSent = false; let frameSender = null; let clientAudioTime = null; let audioResponseFinished = false; let serverClosingWebSocket = false; let readyTimer = null; const frameConditions = createFrameConditionLogger(label); const audioStderr = createFfmpegStderrTail(); const frameStderr = createFfmpegStderrTail(); const frameParser = createJpegFrameParser(handleJpegFrame); const playback = { mode: 'relay', get closed() { return closed; }, stop(reason = 'playback_stopped') { stop(reason); }, attachAudio(request, response) { if (closed) { response.status(410).end(); return; } if (audioResponse && !audioResponse.writableEnded) { response.status(409).json({ error: 'Audio stream already attached for this session.' }); return; } audioResponse = response; response.set({ 'Cache-Control': 'no-store', 'Content-Type': 'audio/mpeg', 'X-Content-Type-Options': 'nosniff', }); response.flushHeaders(); response.on('finish', () => { audioResponseFinished = true; }); const cleanup = once(() => { if (!closed && !audioResponseFinished && !response.writableEnded) { stop('client_disconnect'); } }); request.on('close', cleanup); response.on('close', cleanup); maybeStart(); }, attachFrames(nextWebsocket) { if (closed) { nextWebsocket.close(1011, 'Playback closed'); return; } if (websocket && websocket.readyState === WebSocket.OPEN) { nextWebsocket.close(1013, 'Frame stream already attached for this session'); return; } websocket = nextWebsocket; frameSender = createLatestFrameSender(websocket, { onError: (error) => { if (error && !closed) { stop('websocket_send_error'); } }, onSkip: () => { skippedFrames += 1; }, onQueue: (event) => { frameConditions.senderQueue(event); }, }); sendJson(websocket, { type: 'ready', codec: 'jpeg', fps, quality, width, duration: session.duration, seekable: isSessionSeekable(session), seekSeconds: session.seekSeconds, }); websocket.on('close', () => { if (!closed && !serverClosingWebSocket) { stop('client_disconnect'); } }); websocket.on('error', () => { if (!closed) { stop('websocket_error'); } }); websocket.on('message', (data, isBinary) => { clientAudioTime = handleFrameClientMessage(data, isBinary, label, clientAudioTime); }); maybeStart(); }, }; readyTimer = setTimeout(() => { if (!started && !closed) { stop('consumer_attach_timeout'); } }, PLAYBACK_READY_TIMEOUT_MS).unref(); return playback; function maybeStart() { if (started || closed || !audioResponse || !isWebSocketOpen(websocket)) { return; } started = true; clearReadyTimer(); audioFfmpeg = spawnFfmpeg(buildRelayAudioArgs(session), ['pipe', 'pipe', 'pipe']); frameFfmpeg = spawnFfmpeg(buildRelayFrameArgs(session), ['pipe', 'pipe', 'pipe']); audioLogger = createFfmpegLogger('relay-audio', session.id, audioFfmpeg.pid); frameLogger = createFfmpegLogger('relay-frames', session.id, frameFfmpeg.pid); audioInput = createRelayInputBranch(audioFfmpeg.stdin, handleRelayInputError); frameInput = createRelayInputBranch(frameFfmpeg.stdin, handleRelayInputError); audioLogger.start(); frameLogger.start(); audioFfmpeg.stdout.on('error', (error) => { logWarn(`audio output error kind=${label} error=${oneLine(error.message)}`); stop('audio_output_error'); }); audioFfmpeg.stdout.pipe(audioResponse); frameFfmpeg.stdout.on('error', (error) => { logWarn(`frame output error kind=${label} error=${oneLine(error.message)}`); stop('frame_output_error'); }); frameFfmpeg.stdout.on('data', handleFrameData); audioFfmpeg.stderr.on('data', (chunk) => { audioStderr.write(chunk); audioLogger.stderr(chunk); }); frameFfmpeg.stderr.on('data', (chunk) => { frameStderr.write(chunk); frameLogger.stderr(chunk); }); audioFfmpeg.on('error', (error) => { audioLogger.error(error); stop('audio_start_error'); }); frameFfmpeg.on('error', (error) => { frameLogger.error(error); stop('frame_start_error'); }); audioFfmpeg.on('close', (code, signal) => { audioClosed = true; audioStderr.flush(); audioLogger.close(code, signal, stopReason, audioStderr.value); maybeFinishRelay(); }); frameFfmpeg.on('close', (code, signal) => { frameClosed = true; frameExitCode = code; frameExitSignal = signal; frameStderr.flush(); frameLogger.close(code, signal, stopReason, frameStderr.value); sendFrameEndIfNeeded(); maybeFinishRelay(); }); void startRelaySource(); } async function startRelaySource() { const startedAt = Date.now(); let upstreamStatus = 'pending'; let upstreamEnded = false; sourceController = new AbortController(); try { const headers = new Headers(); headers.set('accept-encoding', 'identity'); const upstream = await fetch(session.url, { headers, redirect: 'follow', signal: sourceController.signal, }); upstreamStatus = String(upstream.status); logInfo(`relay source connected kind=${label} status=${upstream.status} contentType=${upstream.headers.get('content-type') ?? 'unknown'}`); if (!upstream.ok || !upstream.body) { throw new Error(`Source returned HTTP ${upstream.status}`); } for await (const chunk of Readable.fromWeb(upstream.body)) { if (closed || stopping) { return; } const buffer = toBufferView(chunk); sourceBytes += buffer.length; if (!audioInput.write(buffer) || !frameInput.write(buffer)) { stop('relay_input_queue_overflow'); return; } await waitForRelayCapacity([audioInput, frameInput], () => closed || stopping); } upstreamEnded = true; sourceEnded = true; audioInput.end(); frameInput.end(); } catch (error) { if (sourceController.signal.aborted || closed) { return; } logWarn(`relay source failed kind=${label} error=${oneLine(error.message)}`); stop('relay_source_error'); } finally { logInfo(`relay source closed kind=${label} status=${upstreamStatus} bytes=${sourceBytes} upstreamEnded=${upstreamEnded} durationMs=${Date.now() - startedAt}`); } } function handleRelayInputError(error) { if (!closed && !stopping) { logWarn(`relay input error kind=${label} error=${oneLine(error.message)}`); stop('relay_input_error'); } } function handleFrameData(chunk) { frameParser.write(chunk); } function handleJpegFrame(jpeg) { const timestamp = frameIndex / fps; frameIndex += 1; if (isFrameLateForClient(timestamp, clientAudioTime)) { skippedFrames += 1; frameConditions.clientClockSkip(timestamp, clientAudioTime); return true; } const sendResult = frameSender?.send(timestamp, jpeg) ?? 'closed'; if (sendResult === 'closed') { return false; } return true; } function stop(reason) { if (closed) { return; } stopReason = reason; stopping = true; sourceController?.abort(); audioInput?.destroy(); frameInput?.destroy(); if (isChildRunning(audioFfmpeg)) { stopProcess(audioFfmpeg); } if (isChildRunning(frameFfmpeg)) { stopProcess(frameFfmpeg); } if (!isChildRunning(audioFfmpeg) && !isChildRunning(frameFfmpeg)) { finishRelay(1000, 'relay stopped'); } } function maybeFinishRelay() { if (closed) { return; } if (!stopping && !sourceEnded) { stop('relay_process_exit'); return; } if (audioClosed && frameClosed) { finishRelay(1000, 'relay ended'); } } function sendFrameEndIfNeeded() { if (frameEndSent || !isWebSocketOpen(websocket)) { return; } frameEndSent = true; sendJson(websocket, { type: 'end', code: frameExitCode, signal: frameExitSignal, skippedFrames, message: summarizeFfmpegExit(frameExitCode, frameExitSignal, frameStderr.value), }); serverClosingWebSocket = true; websocket.close(1000, 'ffmpeg exited'); } function finishRelay(websocketCode, websocketReason) { if (closed) { return; } closed = true; clearReadyTimer(); sourceController?.abort(); audioInput?.destroy(); frameInput?.destroy(); if (playbacks.get(session.id) === playback) { playbacks.delete(session.id); } if (audioResponse && !audioResponse.writableEnded) { audioResponse.end(); } if (isWebSocketOpen(websocket) && frameEndSent) { serverClosingWebSocket = true; websocket.close(websocketCode, websocketReason); } else { sendFrameEndIfNeeded(); } logInfo(`relay closed kind=${label} reason=${stopReason} sourceBytes=${sourceBytes} frames=${frameIndex} skippedFrames=${skippedFrames} audioInputPeakBytes=${audioInput?.peakBytes ?? 0} frameInputPeakBytes=${frameInput?.peakBytes ?? 0}`); frameConditions.flush('close', true); } function clearReadyTimer() { if (readyTimer) { clearTimeout(readyTimer); readyTimer = null; } } } function createPlayback(session) { const { fps, quality, width } = session.options; const label = `playback:${shortId(session.id)}`; let audioResponse = null; let audioQueue = []; let audioQueueBytes = 0; let audioQueuePeakBytes = 0; let audioBackpressureCount = 0; let audioDrainCount = 0; let audioDrainPending = false; let websocket = null; let ffmpeg = null; let logger = null; let releaseSource = () => {}; let stderr = ''; let frameIndex = 0; let skippedFrames = 0; let stopReason = 'process_exit'; let started = false; let closed = false; let readyTimer = null; let frameSender = null; let clientAudioTime = null; const frameConditions = createFrameConditionLogger(label); const stderrTail = createFfmpegStderrTail(); const frameParser = createJpegFrameParser(handleJpegFrame); const playback = { mode: 'single', get closed() { return closed; }, stop(reason = 'playback_stopped') { stop(reason); }, attachAudio(request, response) { if (closed) { response.status(410).end(); return; } if (audioResponse && !audioResponse.writableEnded) { response.status(409).json({ error: 'Audio stream already attached for this session.' }); return; } audioResponse = response; response.set({ 'Cache-Control': 'no-store', 'Content-Type': 'audio/mpeg', 'X-Content-Type-Options': 'nosniff', }); response.flushHeaders(); const cleanup = once(() => { if (!closed) { stop('client_disconnect'); } }); request.on('close', cleanup); response.on('close', cleanup); maybeStart(); }, attachFrames(nextWebsocket) { if (closed) { nextWebsocket.close(1011, 'Playback closed'); return; } if (websocket && websocket.readyState === WebSocket.OPEN) { nextWebsocket.close(1013, 'Frame stream already attached for this session'); return; } websocket = nextWebsocket; frameSender = createLatestFrameSender(websocket, { onError: (error) => { if (error && !closed) { stop('websocket_send_error'); } }, onSkip: () => { skippedFrames += 1; }, onQueue: (event) => { frameConditions.senderQueue(event); }, }); sendJson(websocket, { type: 'ready', codec: 'jpeg', fps, quality, width, duration: session.duration, seekable: isSessionSeekable(session), seekSeconds: session.seekSeconds, }); websocket.on('close', () => { if (!closed) { stop('client_disconnect'); } }); websocket.on('error', () => { if (!closed) { stop('websocket_error'); } }); websocket.on('message', (data, isBinary) => { clientAudioTime = handleFrameClientMessage(data, isBinary, label, clientAudioTime); }); maybeStart(); }, }; readyTimer = setTimeout(() => { if (!started && !closed) { stop('consumer_attach_timeout'); } }, PLAYBACK_READY_TIMEOUT_MS).unref(); return playback; function maybeStart() { if (started || closed || !audioResponse || !isWebSocketOpen(websocket)) { return; } started = true; clearReadyTimer(); const source = createSourceInput(session.id, 'playback'); releaseSource = once(source.release); ffmpeg = spawnFfmpeg(buildPlaybackArgs(session, source.url), ['ignore', 'pipe', 'pipe', 'pipe']); logger = createFfmpegLogger('playback', session.id, ffmpeg.pid); logger.start(); const frameStream = ffmpeg.stdio[3]; ffmpeg.stdout.on('data', handleAudioData); ffmpeg.stdout.on('error', (error) => { logWarn(`audio output error kind=${label} error=${oneLine(error.message)}`); stop('audio_output_error'); }); frameStream.on('error', (error) => { logWarn(`frame output error kind=${label} error=${oneLine(error.message)}`); stop('frame_output_error'); }); frameStream.on('data', handleFrameData); ffmpeg.stderr.on('data', (chunk) => { stderrTail.write(chunk); logger.stderr(chunk); }); ffmpeg.on('error', (error) => { stopReason = 'start_error'; logger.error(error); finishPlayback(1011, `Failed to start ffmpeg: ${error.message}`, null, null); }); ffmpeg.on('close', (code, signal) => { stderrTail.flush(); stderr = stderrTail.value; logger.close(code, signal, stopReason, stderr); finishPlayback(1000, 'ffmpeg exited', code, signal); }); } function handleAudioData(chunk) { if (closed) { return; } if (!audioResponse || audioResponse.writableEnded) { stop('audio_response_closed'); return; } audioQueue.push(chunk); audioQueueBytes += chunk.length; audioQueuePeakBytes = Math.max(audioQueuePeakBytes, audioQueueBytes); if (audioQueueBytes > MAX_AUDIO_QUEUE_BYTES) { logWarn(`audio queue overflow kind=${label} bytes=${audioQueueBytes} maxBytes=${MAX_AUDIO_QUEUE_BYTES}`); stop('audio_queue_overflow'); return; } flushAudioQueue(); } function flushAudioQueue() { if (closed || !audioResponse || audioResponse.writableEnded) { return; } while (audioQueue.length > 0) { const chunk = audioQueue.shift(); audioQueueBytes -= chunk.length; let canContinue = false; try { canContinue = audioResponse.write(chunk); } catch (error) { logWarn(`audio response write failed kind=${label} error=${oneLine(error.message)}`); stop('audio_response_write_error'); return; } if (!canContinue) { audioBackpressureCount += 1; waitForAudioDrain(); return; } } } function waitForAudioDrain() { if (audioDrainPending || !audioResponse || audioResponse.writableEnded) { return; } audioDrainPending = true; audioResponse.once('drain', () => { audioDrainPending = false; audioDrainCount += 1; flushAudioQueue(); }); } function handleFrameData(chunk) { frameParser.write(chunk); } function handleJpegFrame(jpeg) { const timestamp = frameIndex / fps; frameIndex += 1; if (isFrameLateForClient(timestamp, clientAudioTime)) { skippedFrames += 1; frameConditions.clientClockSkip(timestamp, clientAudioTime); return true; } const sendResult = frameSender?.send(timestamp, jpeg) ?? 'closed'; if (sendResult === 'closed') { return false; } return true; } function stop(reason) { stopReason = reason; if (ffmpeg && ffmpeg.exitCode === null && ffmpeg.signalCode === null) { stopProcess(ffmpeg); return; } finishPlayback(1000, 'playback stopped', null, null); } function finishPlayback(websocketCode, websocketReason, ffmpegCode, ffmpegSignal) { if (closed) { return; } closed = true; clearReadyTimer(); releaseSource(); if (playbacks.get(session.id) === playback) { playbacks.delete(session.id); } audioQueue = []; audioQueueBytes = 0; if (audioResponse && !audioResponse.writableEnded) { audioResponse.end(); } if (isWebSocketOpen(websocket)) { sendJson(websocket, { type: 'end', code: ffmpegCode, signal: ffmpegSignal, skippedFrames, message: summarizeFfmpegExit(ffmpegCode, ffmpegSignal, stderr), }); websocket.close(websocketCode, websocketReason); } logInfo(`playback closed kind=${label} reason=${stopReason} frames=${frameIndex} skippedFrames=${skippedFrames} audioQueuePeakBytes=${audioQueuePeakBytes} audioBackpressureCount=${audioBackpressureCount} audioDrainCount=${audioDrainCount}`); frameConditions.flush('close', true); } function clearReadyTimer() { if (readyTimer) { clearTimeout(readyTimer); readyTimer = null; } } } function isWebSocketOpen(websocket) { return websocket?.readyState === WebSocket.OPEN; } function createJpegFrameParser(onFrame) { let collecting = false; let pendingMarkerByte = false; let parts = []; let byteLength = 0; return { write(chunk) { const buffer = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk); let offset = 0; while (offset < buffer.length) { if (!collecting) { if (pendingMarkerByte && buffer[offset] === 0xd8) { collecting = true; pendingMarkerByte = false; parts = [JPEG_SOI]; byteLength = JPEG_SOI.length; offset += 1; } else { const start = buffer.indexOf(JPEG_SOI, offset); if (start === -1) { pendingMarkerByte = buffer[buffer.length - 1] === 0xff; return true; } collecting = true; pendingMarkerByte = false; parts = []; byteLength = 0; offset = start; } } if (pendingMarkerByte) { pendingMarkerByte = false; if (buffer[offset] === 0xd9) { appendFramePart(buffer.subarray(offset, offset + 1)); offset += 1; if (!emitFrame()) { return false; } continue; } } const end = buffer.indexOf(JPEG_EOI, offset); if (end === -1) { appendFramePart(buffer.subarray(offset)); pendingMarkerByte = buffer[buffer.length - 1] === 0xff; return true; } appendFramePart(buffer.subarray(offset, end + JPEG_EOI.length)); offset = end + JPEG_EOI.length; if (!emitFrame()) { return false; } } return true; }, }; function appendFramePart(part) { if (part.length === 0) { return; } parts.push(part); byteLength += part.length; } function emitFrame() { const frame = { parts, byteLength }; collecting = false; pendingMarkerByte = false; parts = []; byteLength = 0; return onFrame(frame) !== false; } } function createLatestFrameSender(websocket, { onError = () => {}, onSkip = () => {}, onQueue = () => {} } = {}) { let sending = false; let latestFrame = null; let pumpTimer = null; return { send(timestamp, jpeg) { if (!isWebSocketOpen(websocket)) { return 'closed'; } const packetBytes = 8 + getJpegFrameByteLength(jpeg); if (sending) { queueLatestFrame(timestamp, jpeg, { reason: 'send_in_progress', packetBytes, bufferedAmount: websocket.bufferedAmount, }); return 'queued'; } if (isWebSocketOverFrameBudget(websocket, packetBytes)) { queueLatestFrame(timestamp, jpeg, { reason: 'websocket_buffer_budget', packetBytes, bufferedAmount: websocket.bufferedAmount, }); return 'queued'; } return sendNow(timestamp, jpeg, packetBytes); }, }; function queueLatestFrame(timestamp, jpeg, event) { const replacing = Boolean(latestFrame); if (replacing) { onSkip(); } latestFrame = { timestamp, jpeg }; onQueue({ ...event, replacing }); schedulePump(); } function sendNow(timestamp, jpeg, packetBytes = 8 + getJpegFrameByteLength(jpeg)) { if (!isWebSocketOpen(websocket)) { return 'closed'; } const packet = Buffer.allocUnsafe(packetBytes); packet.writeDoubleLE(timestamp, 0); copyJpegFrame(jpeg, packet, 8); sending = true; try { websocket.send(packet, { binary: true, compress: false }, (error) => { sending = false; if (error) { onError(error); return; } pumpLatestFrame(); }); } catch (error) { sending = false; onError(error); return 'closed'; } return 'sent'; } function pumpLatestFrame() { if (!latestFrame || sending) { return; } if (!isWebSocketOpen(websocket)) { latestFrame = null; return; } const packetBytes = 8 + getJpegFrameByteLength(latestFrame.jpeg); if (isWebSocketOverFrameBudget(websocket, packetBytes)) { schedulePump(); return; } const frame = latestFrame; latestFrame = null; sendNow(frame.timestamp, frame.jpeg, packetBytes); } function schedulePump() { if (pumpTimer) { return; } pumpTimer = setTimeout(() => { pumpTimer = null; pumpLatestFrame(); }, 10); pumpTimer.unref?.(); } } function handleFrameClientMessage(data, isBinary, label, currentClientAudioTime) { const message = parseFrameClientMessage(data, isBinary); if (!message) { return currentClientAudioTime; } if (message.type === 'clock') { return message.currentTime; } if (message.type === 'telemetry') { logClientTelemetry(label, message); } return currentClientAudioTime; } function parseFrameClientMessage(data, isBinary) { if (isBinary) { return null; } let message; try { const raw = typeof data === 'string' ? data : data.toString('utf8'); message = JSON.parse(raw); } catch { return null; } if (message?.type === 'clock') { const currentTime = Number(message.currentTime); if (!Number.isFinite(currentTime)) { return null; } return { type: 'clock', currentTime: clampNumber(currentTime, 0, Number.MAX_SAFE_INTEGER), }; } if (message?.type === 'telemetry') { return { type: 'telemetry', reason: sanitizeTelemetryText(message.reason, 'periodic'), currentTime: finiteTelemetryNumber(message.currentTime), paused: Boolean(message.paused), readyState: finiteTelemetryNumber(message.readyState), pendingFrames: finiteTelemetryNumber(message.pendingFrames), decodedFrames: finiteTelemetryNumber(message.decodedFrames), paintedFrames: finiteTelemetryNumber(message.paintedFrames), lastFramePacketAgeMs: finiteTelemetryNumber(message.lastFramePacketAgeMs), lastFramePaintAgeMs: finiteTelemetryNumber(message.lastFramePaintAgeMs), hidden: Boolean(message.hidden), online: Boolean(message.online), counters: sanitizeTelemetryMap(message.counters), max: sanitizeTelemetryMap(message.max), }; } return null; } function isFrameLateForClient(timestamp, clientAudioTime) { return Number.isFinite(clientAudioTime) && timestamp < clientAudioTime - CLIENT_CLOCK_FRAME_LATE_GRACE_SECONDS; } function createFrameConditionLogger(label) { let counters = {}; let max = {}; let lastLoggedAt = 0; return { senderQueue(event) { incrementCounter(`senderQueue_${event.reason}`); if (event.replacing) { incrementCounter('senderReplacedLatest'); } recordMax('wsBufferedBytes', event.bufferedAmount); recordMax('framePacketBytes', event.packetBytes); flush('periodic'); }, clientClockSkip(timestamp, clientAudioTime) { incrementCounter('clientClockSkips'); recordMax('clientClockLagMs', (clientAudioTime - timestamp) * 1000); flush('periodic'); }, flush, }; function incrementCounter(name, count = 1) { counters[name] = (counters[name] ?? 0) + count; } function recordMax(name, value) { if (!Number.isFinite(value)) { return; } max[name] = Math.max(max[name] ?? 0, Math.round(value)); } function flush(reason = 'periodic', force = false) { const now = Date.now(); if (!force && now - lastLoggedAt < FRAME_CONDITION_LOG_INTERVAL_MS) { return; } if (Object.keys(counters).length === 0 && Object.keys(max).length === 0) { return; } logInfo(`frame conditions kind=${label} reason=${reason} counters=${formatTelemetryMap(counters)} max=${formatTelemetryMap(max)}`); counters = {}; max = {}; lastLoggedAt = now; } } function logClientTelemetry(label, telemetry) { logInfo( `client telemetry kind=${label}` + ` reason=${telemetry.reason}` + ` currentTime=${formatTelemetryNumber(telemetry.currentTime)}` + ` paused=${telemetry.paused}` + ` readyState=${formatTelemetryNumber(telemetry.readyState)}` + ` pendingFrames=${formatTelemetryNumber(telemetry.pendingFrames)}` + ` decodedFrames=${formatTelemetryNumber(telemetry.decodedFrames)}` + ` paintedFrames=${formatTelemetryNumber(telemetry.paintedFrames)}` + ` lastFramePacketAgeMs=${formatTelemetryNumber(telemetry.lastFramePacketAgeMs)}` + ` lastFramePaintAgeMs=${formatTelemetryNumber(telemetry.lastFramePaintAgeMs)}` + ` hidden=${telemetry.hidden}` + ` online=${telemetry.online}` + ` counters=${formatTelemetryMap(telemetry.counters)}` + ` max=${formatTelemetryMap(telemetry.max)}`, ); } function sanitizeTelemetryMap(value) { if (!value || typeof value !== 'object' || Array.isArray(value)) { return {}; } const sanitized = {}; for (const [key, rawValue] of Object.entries(value).slice(0, 40)) { const name = sanitizeTelemetryText(key, '').replace(/[^a-zA-Z0-9_.-]/g, '_').slice(0, 80); const number = finiteTelemetryNumber(rawValue); if (name && number !== null) { sanitized[name] = number; } } return sanitized; } function sanitizeTelemetryText(value, fallback) { const text = typeof value === 'string' ? value : fallback; return oneLine(text).replace(/"/g, '').slice(0, 120); } function finiteTelemetryNumber(value) { const number = Number(value); return Number.isFinite(number) ? number : null; } function formatTelemetryMap(value) { const entries = Object.entries(value); if (entries.length === 0) { return 'none'; } return entries.map(([key, entryValue]) => `${key}=${formatTelemetryNumber(entryValue)}`).join(','); } function formatTelemetryNumber(value) { if (!Number.isFinite(value)) { return 'null'; } return Math.round(value * 1000) / 1000; } function isWebSocketOverFrameBudget(websocket, packetBytes) { return websocket.bufferedAmount > 0 && websocket.bufferedAmount + packetBytes > MAX_WS_BUFFER_BYTES; } function getJpegFrameByteLength(jpeg) { return Buffer.isBuffer(jpeg) ? jpeg.length : jpeg.byteLength; } function copyJpegFrame(jpeg, packet, offset) { if (Buffer.isBuffer(jpeg)) { jpeg.copy(packet, offset); return; } let writeOffset = offset; for (const part of jpeg.parts) { part.copy(packet, writeOffset); writeOffset += part.length; } } function toBufferView(chunk) { if (Buffer.isBuffer(chunk)) { return chunk; } if (chunk instanceof ArrayBuffer) { return Buffer.from(chunk); } return Buffer.from(chunk.buffer, chunk.byteOffset, chunk.byteLength); } function createSourceInput(sessionId, kind) { const token = randomUUID(); sourceTokens.set(token, { sessionId, kind, createdAt: Date.now() }); return { url: `http://127.0.0.1:${getListeningPort()}/_source/${token}`, release: () => sourceTokens.delete(token), }; } function getListeningPort() { const address = server.address(); return typeof address === 'object' && address ? address.port : PORT; } function createAudioWorker(session) { const source = createSourceInput(session.id, 'audio'); return { args: buildAudioArgs(session, source.url), release: source.release, }; } function createFrameWorker(session) { const source = createSourceInput(session.id, 'frames'); return { args: buildFrameArgs(session, source.url), release: source.release, }; } function buildRelayAudioArgs(session) { return buildAudioArgs(session, 'pipe:0', { seekable: false, startTime: 0 }); } function buildRelayFrameArgs(session) { return buildFrameArgs(session, 'pipe:0', { seekable: false, startTime: 0 }); } function buildPlaybackArgs(session, inputUrl) { return [ ...buildInputArgs(inputUrl, { startTime: session.seekSeconds }), '-map', '0:a:0?', '-vn', '-ac', String(session.options.audioChannels), '-ar', String(session.options.audioSampleRate), '-codec:a', 'libmp3lame', '-b:a', session.options.audioBitrate, '-f', 'mp3', 'pipe:1', '-map', '0:v:0', ...buildFrameOutputArgs(session, 'pipe:3'), ]; } function buildAudioArgs(session, inputUrl, inputOptions) { return [ ...buildInputArgs(inputUrl, { ...inputOptions, startTime: inputOptions?.startTime ?? session.seekSeconds }), '-map', '0:a:0?', '-vn', '-ac', String(session.options.audioChannels), '-ar', String(session.options.audioSampleRate), '-codec:a', 'libmp3lame', '-b:a', session.options.audioBitrate, '-f', 'mp3', 'pipe:1', ]; } function buildFrameArgs(session, inputUrl, inputOptions) { return [ ...buildInputArgs(inputUrl, { ...inputOptions, startTime: inputOptions?.startTime ?? session.seekSeconds }), '-map', '0:v:0', ...buildFrameOutputArgs(session, 'pipe:1'), ]; } function buildInputArgs(inputUrl, { seekable = true, startTime = 0 } = {}) { const args = [ '-hide_banner', '-nostdin', '-loglevel', FFMPEG_LOG_LEVEL, '-nostats', ]; if (seekable) { args.push('-seekable', startTime > 0 ? '1' : FFMPEG_INPUT_SEEKABLE); } if (startTime > 0) { args.push('-ss', formatFfmpegSeconds(startTime)); } if (FFMPEG_HTTP_RECONNECT && isHttpInputUrl(inputUrl)) { args.push( '-reconnect', '1', '-reconnect_on_network_error', '1', '-reconnect_streamed', '1', '-reconnect_delay_max', String(FFMPEG_HTTP_RECONNECT_DELAY_MAX), '-reconnect_max_retries', String(FFMPEG_HTTP_RECONNECT_MAX_RETRIES), ); if (FFMPEG_HTTP_RECONNECT_ON_HTTP_ERROR) { args.push('-reconnect_on_http_error', FFMPEG_HTTP_RECONNECT_ON_HTTP_ERROR); } } args.push('-re', '-i', inputUrl); return args; } function isHttpInputUrl(inputUrl) { return inputUrl.startsWith('http://') || inputUrl.startsWith('https://'); } function formatFfmpegSeconds(seconds) { return clampNumber(seconds, 0, Number.MAX_SAFE_INTEGER).toFixed(3); } function buildFrameOutputArgs(session, outputUrl) { const { fps, quality, width } = session.options; const videoFilter = `fps=${fps},scale=w='min(${width},iw)':h=-2:flags=bicubic:out_range=pc,format=yuvj420p`; return [ '-an', '-vf', videoFilter, '-codec:v', 'mjpeg', '-pix_fmt', 'yuvj420p', '-color_range', 'pc', '-q:v', String(quality), '-f', 'image2pipe', outputUrl, ]; } function spawnFfmpeg(args, stdio = ['ignore', 'pipe', 'pipe']) { return spawn(FFMPEG_PATH, args, { stdio, }); } function createFfmpegLogger(kind, sessionId, pid) { const startedAt = Date.now(); const label = `${kind}:${shortId(sessionId)}`; const lineLogger = createLineLogger((line) => { if (shouldSuppressFfmpegLogLine(line)) { return; } logWarn(`ffmpeg stderr kind=${label} pid=${pid ?? 'unknown'} line="${oneLine(redactSecrets(line))}"`); }); return { start() { logInfo(`ffmpeg started kind=${label} pid=${pid ?? 'unknown'} loglevel=${FFMPEG_LOG_LEVEL}`); }, stderr(chunk) { lineLogger.write(chunk); }, error(error) { logWarn(`ffmpeg start error kind=${label} pid=${pid ?? 'unknown'} error=${oneLine(error.message)}`); }, close(code, signal, reason, stderr) { lineLogger.flush(); const level = reason === 'client_disconnect' || code === 0 || signal === 'SIGTERM' ? 'info' : 'warn'; const tail = redactSecrets(stderr).trim(); const tailText = tail ? ` stderrTail="${oneLine(tail)}"` : ''; log(level, `ffmpeg exited kind=${label} pid=${pid ?? 'unknown'} code=${code ?? 'null'} signal=${signal ?? 'none'} reason=${reason} durationMs=${Date.now() - startedAt}${tailText}`); }, }; } function shouldSuppressFfmpegLogLine(line) { return line.includes('deprecated pixel format used, make sure you did set range correctly'); } function createFfmpegStderrTail() { let value = ''; const lineLogger = createLineLogger((line) => { if (!shouldSuppressFfmpegLogLine(line)) { value = appendTail(value, `${line}\n`); } }); return { get value() { return value; }, write(chunk) { lineLogger.write(chunk); }, flush() { lineLogger.flush(); }, }; } function createRelayInputBranch(stream, onError) { let accepting = true; let ending = false; let drainPending = false; let streamPendingBytes = 0; let queue = []; let queueBytes = 0; let peakBytes = 0; let waiters = []; stream.on('drain', () => { drainPending = false; streamPendingBytes = 0; updatePeakBytes(); notifyWaiters(); flush(); }); stream.on('error', (error) => { accepting = false; streamPendingBytes = 0; notifyWaiters(); onError(error); }); stream.on('close', () => { accepting = false; streamPendingBytes = 0; notifyWaiters(); }); return { get queueBytes() { return getPendingBytes(); }, get peakBytes() { return peakBytes; }, write(chunk) { if (!accepting || stream.destroyed) { return false; } if (queue.length === 0 && !drainPending) { return writeNow(chunk); } queue.push(chunk); queueBytes += chunk.length; updatePeakBytes(); notifyWaiters(); return getPendingBytes() <= MAX_RELAY_BRANCH_QUEUE_BYTES; }, end() { accepting = false; ending = true; flush(); }, destroy() { accepting = false; ending = false; queue = []; queueBytes = 0; streamPendingBytes = 0; notifyWaiters(); if (!stream.destroyed) { stream.destroy(); } }, waitForCapacity() { if (getPendingBytes() <= RELAY_BRANCH_PAUSE_BYTES || !accepting || stream.destroyed) { return Promise.resolve(); } return new Promise((resolve) => { waiters.push(resolve); }); }, }; function writeNow(chunk) { try { if (!stream.write(chunk)) { drainPending = true; streamPendingBytes += chunk.length; updatePeakBytes(); } notifyWaiters(); return true; } catch (error) { accepting = false; streamPendingBytes = 0; notifyWaiters(); onError(error); return false; } } function flush() { while (queue.length > 0 && !drainPending && !stream.destroyed) { const chunk = queue.shift(); queueBytes -= chunk.length; if (!writeNow(chunk)) { return; } notifyWaiters(); } if (ending && queue.length === 0 && !drainPending && !stream.destroyed) { stream.end(); } } function notifyWaiters() { if (getPendingBytes() > RELAY_BRANCH_PAUSE_BYTES && accepting && !stream.destroyed) { return; } const currentWaiters = waiters; waiters = []; for (const resolve of currentWaiters) { resolve(); } } function getPendingBytes() { return queueBytes + streamPendingBytes; } function updatePeakBytes() { peakBytes = Math.max(peakBytes, getPendingBytes()); } } async function waitForRelayCapacity(branches, shouldStop) { while (!shouldStop()) { const blockedBranches = branches.filter((branch) => branch.queueBytes > RELAY_BRANCH_PAUSE_BYTES); if (blockedBranches.length === 0) { return; } await Promise.race(blockedBranches.map((branch) => branch.waitForCapacity())); } } function createLineLogger(callback) { let buffer = ''; return { write(chunk) { buffer += chunk.toString('utf8'); const lines = buffer.split(/\r?\n/); buffer = lines.pop() ?? ''; for (const line of lines) { if (line.trim()) { callback(line); } } }, flush() { if (buffer.trim()) { callback(buffer); } buffer = ''; }, }; } function copyUpstreamHeaders(upstreamHeaders, response) { for (const header of [ 'accept-ranges', 'content-length', 'content-range', 'content-type', 'etag', 'last-modified', ]) { const value = upstreamHeaders.get(header); if (value) { response.setHeader(header, value); } } response.setHeader('cache-control', 'no-store'); } function sendJson(websocket, payload) { if (websocket.readyState === WebSocket.OPEN) { websocket.send(JSON.stringify(payload)); } } function appendTail(current, chunk) { const next = current + chunk.toString('utf8'); return next.length > 4000 ? next.slice(-4000) : next; } function shortId(id) { return id.slice(0, 8); } function oneLine(value) { return String(value).replace(/\s+/g, ' ').trim().slice(0, 1200); } function logInfo(message) { log('info', message); } function logWarn(message) { log('warn', message); } function log(level, message) { const output = `${new Date().toISOString()} ${message}`; if (level === 'warn') { console.warn(output); return; } console.log(output); } function summarizeFfmpegExit(code, signal, stderr) { if (signal) { return `ffmpeg stopped by ${signal}.`; } if (code === 0 || code === null) { return 'Frame stream ended.'; } const detail = redactSecrets(stderr).trim(); return detail ? `ffmpeg exited with code ${code}: ${detail}` : `ffmpeg exited with code ${code}.`; } function redactSecrets(text) { return text.replace(/([?&](?:api_key|apikey|access_token|token|key)=)[^&\s]+/gi, '$1[redacted]'); } function isChildRunning(child) { return Boolean(child && child.exitCode === null && child.signalCode === null); } function stopProcess(child) { if (!child || child.exitCode !== null || child.signalCode !== null) { return; } child.kill('SIGTERM'); setTimeout(() => { if (child.exitCode === null && child.signalCode === null) { child.kill('SIGKILL'); } }, 1500).unref(); } function once(callback) { let called = false; return (...args) => { if (called) { return; } called = true; callback(...args); }; }