From 518268642674422ac774f3e0c6894de864ddcfa6 Mon Sep 17 00:00:00 2001 From: James Magahern Date: Fri, 1 May 2026 22:41:51 -0700 Subject: [PATCH] single ffmpeg stream --- README.md | 8 + docker-compose-example.yml | 2 + server/index.js | 533 +++++++++++++++++++++++++------------ 3 files changed, 367 insertions(+), 176 deletions(-) diff --git a/README.md b/README.md index d842ee8..70d33c2 100644 --- a/README.md +++ b/README.md @@ -41,6 +41,14 @@ The app uses CPU decoding by default, so no video device is required. The compos Recently played URLs are stored globally by the backend. In Docker Compose, they are persisted in the `frame-stream-data` named volume. +`ffmpeg` worker lifecycle, stderr warnings/errors, and source proxy open/close events are written to stdout/stderr, so they appear in `docker logs`. For more detail while debugging a stream, set `FFMPEG_LOG_LEVEL=info` in Docker Compose and run: + +```sh +docker logs -f frame-stream-player +``` + +The app sets `FFMPEG_INPUT_SEEKABLE=0` by default so `ffmpeg` reads stream inputs sequentially and avoids extra HTTP range connections. If a specific VOD file requires seeking for metadata, set `FFMPEG_INPUT_SEEKABLE=-1` to restore ffmpeg's automatic behavior. + ## Tuning The UI intentionally hides these settings, but the backend still supports them through `POST /api/session`. diff --git a/docker-compose-example.yml b/docker-compose-example.yml index 1c591e9..abceb2b 100644 --- a/docker-compose-example.yml +++ b/docker-compose-example.yml @@ -9,6 +9,8 @@ services: environment: PORT: "3000" NODE_ENV: production + FFMPEG_LOG_LEVEL: warning + FFMPEG_INPUT_SEEKABLE: "0" RECENT_URLS_PATH: /app/data/recent-urls.json extra_hosts: - "host.docker.internal:host-gateway" diff --git a/server/index.js b/server/index.js index 5a29dc6..a2d760a 100644 --- a/server/index.js +++ b/server/index.js @@ -18,9 +18,12 @@ const wss = new WebSocketServer({ noServer: true }); const PORT = Number(process.env.PORT ?? 3000); const FFMPEG_PATH = process.env.FFMPEG_PATH ?? 'ffmpeg'; +const FFMPEG_LOG_LEVEL = process.env.FFMPEG_LOG_LEVEL ?? 'warning'; +const FFMPEG_INPUT_SEEKABLE = process.env.FFMPEG_INPUT_SEEKABLE ?? '0'; const RECENT_URLS_PATH = process.env.RECENT_URLS_PATH ?? path.join(__dirname, '..', 'data', 'recent-urls.json'); const RECENT_URL_LIMIT = clampInteger(process.env.RECENT_URL_LIMIT, 12, 1, 50); const SESSION_TTL_MS = 60 * 60 * 1000; +const PLAYBACK_READY_TIMEOUT_MS = 15 * 1000; const MAX_WS_BUFFER_BYTES = 12 * 1024 * 1024; const JPEG_SOI = Buffer.from([0xff, 0xd8]); const JPEG_EOI = Buffer.from([0xff, 0xd9]); @@ -34,6 +37,7 @@ const defaults = { const sessions = new Map(); const sourceTokens = new Map(); +const playbacks = new Map(); let recentUrls = []; let recentWrite = Promise.resolve(); @@ -88,6 +92,11 @@ app.post('/api/session', async (request, response) => { app.all('/_source/:token', async (request, response) => { const source = sourceTokens.get(request.params.token); const session = source ? getSession(source.sessionId) : null; + const startedAt = Date.now(); + const sourceLabel = source ? `${source.kind}:${shortId(source.sessionId)}` : `unknown:${request.params.token.slice(0, 8)}`; + let bytes = 0; + let upstreamEnded = false; + let upstreamStatus = 'pending'; if (!session) { response.status(404).end(); @@ -95,7 +104,13 @@ app.all('/_source/:token', async (request, response) => { } const controller = new AbortController(); - const cleanup = once(() => controller.abort()); + const logClose = once(() => { + logInfo(`source closed kind=${sourceLabel} status=${upstreamStatus} bytes=${bytes} upstreamEnded=${upstreamEnded} durationMs=${Date.now() - startedAt}`); + }); + const cleanup = once(() => { + controller.abort(); + logClose(); + }); response.on('close', cleanup); try { @@ -115,6 +130,9 @@ app.all('/_source/:token', async (request, response) => { signal: controller.signal, }); + upstreamStatus = String(upstream.status); + logInfo(`source connected kind=${sourceLabel} status=${upstream.status} contentType=${upstream.headers.get('content-type') ?? 'unknown'}`); + response.status(upstream.status); copyUpstreamHeaders(upstream.headers, response); @@ -123,7 +141,15 @@ app.all('/_source/:token', async (request, response) => { return; } - Readable.fromWeb(upstream.body).on('error', (error) => { + Readable.fromWeb(upstream.body).on('data', (chunk) => { + bytes += chunk.length; + }).on('end', () => { + upstreamEnded = true; + }).on('error', (error) => { + if (!controller.signal.aborted) { + logWarn(`source stream error kind=${sourceLabel} error=${oneLine(error.message)}`); + } + if (!response.destroyed) { response.destroy(error); } @@ -133,7 +159,7 @@ app.all('/_source/:token', async (request, response) => { return; } - console.error(`Source proxy failed: ${error.message}`); + logWarn(`source failed kind=${sourceLabel} error=${oneLine(error.message)}`); if (!response.headersSent) { response.status(502).json({ error: 'Failed to fetch source stream.' }); @@ -151,51 +177,7 @@ app.get('/audio/:sessionId', (request, response) => { return; } - const worker = createAudioWorker(session); - const releaseWorker = once(worker.release); - const ffmpeg = spawnFfmpeg(worker.args); - let stderr = ''; - - response.set({ - 'Cache-Control': 'no-store', - 'Content-Type': 'audio/mpeg', - 'X-Content-Type-Options': 'nosniff', - }); - response.flushHeaders(); - - ffmpeg.stderr.on('data', (chunk) => { - stderr = appendTail(stderr, chunk); - }); - - ffmpeg.stdout.pipe(response); - - ffmpeg.on('error', (error) => { - releaseWorker(); - console.error(`Failed to start ffmpeg audio worker: ${error.message}`); - if (!response.writableEnded) { - response.end(); - } - }); - - ffmpeg.on('close', (code, signal) => { - releaseWorker(); - - if (code && code !== 255) { - console.warn(`ffmpeg audio worker exited with code ${code}: ${redactSecrets(stderr)}`); - } - - if (signal) { - console.warn(`ffmpeg audio worker stopped by ${signal}`); - } - - if (!response.writableEnded) { - response.end(); - } - }); - - const cleanup = once(() => stopProcess(ffmpeg)); - request.on('close', cleanup); - response.on('close', cleanup); + getOrCreatePlayback(session).attachAudio(request, response); }); server.on('upgrade', (request, socket, head) => { @@ -222,100 +204,7 @@ wss.on('connection', (websocket, _request, sessionId) => { return; } - const worker = createFrameWorker(session); - const releaseWorker = once(worker.release); - const ffmpeg = spawnFfmpeg(worker.args); - const { fps, quality, width } = session.options; - let buffer = Buffer.alloc(0); - let frameIndex = 0; - let skippedFrames = 0; - let stderr = ''; - let closedByClient = false; - - sendJson(websocket, { - type: 'ready', - codec: 'jpeg', - fps, - quality, - width, - }); - - ffmpeg.stdout.on('data', (chunk) => { - buffer = Buffer.concat([buffer, chunk]); - - for (;;) { - const start = buffer.indexOf(JPEG_SOI); - - if (start === -1) { - buffer = Buffer.alloc(0); - return; - } - - const end = buffer.indexOf(JPEG_EOI, start + JPEG_SOI.length); - - if (end === -1) { - buffer = start === 0 ? buffer : buffer.subarray(start); - return; - } - - const jpeg = buffer.subarray(start, end + JPEG_EOI.length); - buffer = buffer.subarray(end + JPEG_EOI.length); - const timestamp = frameIndex / fps; - frameIndex += 1; - - if (websocket.readyState !== WebSocket.OPEN) { - return; - } - - if (websocket.bufferedAmount > MAX_WS_BUFFER_BYTES) { - skippedFrames += 1; - continue; - } - - const packet = Buffer.allocUnsafe(8 + jpeg.length); - packet.writeDoubleLE(timestamp, 0); - jpeg.copy(packet, 8); - websocket.send(packet, { binary: true }); - } - }); - - ffmpeg.stderr.on('data', (chunk) => { - stderr = appendTail(stderr, chunk); - }); - - ffmpeg.on('error', (error) => { - releaseWorker(); - sendJson(websocket, { - type: 'error', - message: `Failed to start ffmpeg: ${error.message}`, - }); - websocket.close(1011, 'ffmpeg start failed'); - }); - - ffmpeg.on('close', (code, signal) => { - releaseWorker(); - - if (websocket.readyState === WebSocket.OPEN && !closedByClient) { - sendJson(websocket, { - type: 'end', - code, - signal, - skippedFrames, - message: summarizeFfmpegExit(code, signal, stderr), - }); - websocket.close(1000, 'ffmpeg exited'); - } - }); - - websocket.on('close', () => { - closedByClient = true; - stopProcess(ffmpeg); - }); - - websocket.on('error', () => { - closedByClient = true; - stopProcess(ffmpeg); - }); + getOrCreatePlayback(session).attachFrames(websocket); }); setInterval(() => { @@ -435,27 +324,246 @@ function getSession(sessionId) { return session; } -function createAudioWorker(session) { - const source = createSourceInput(session.id); +function getOrCreatePlayback(session) { + const existing = playbacks.get(session.id); - return { - args: buildAudioArgs(session, source.url), - release: source.release, - }; + if (existing && !existing.closed) { + return existing; + } + + const playback = createPlayback(session); + playbacks.set(session.id, playback); + return playback; } -function createFrameWorker(session) { - const source = createSourceInput(session.id); +function createPlayback(session) { + const { fps, quality, width } = session.options; + const label = `playback:${shortId(session.id)}`; + let audioResponse = null; + let websocket = null; + let ffmpeg = null; + let logger = null; + let releaseSource = () => {}; + let stderr = ''; + let frameBuffer = Buffer.alloc(0); + let frameIndex = 0; + let skippedFrames = 0; + let stopReason = 'process_exit'; + let started = false; + let closed = false; + let readyTimer = null; - return { - args: buildFrameArgs(session, source.url), - release: source.release, + const playback = { + get closed() { + return closed; + }, + attachAudio(request, response) { + if (closed) { + response.status(410).end(); + return; + } + + if (audioResponse && !audioResponse.writableEnded) { + response.status(409).json({ error: 'Audio stream already attached for this session.' }); + return; + } + + audioResponse = response; + response.set({ + 'Cache-Control': 'no-store', + 'Content-Type': 'audio/mpeg', + 'X-Content-Type-Options': 'nosniff', + }); + response.flushHeaders(); + + const cleanup = once(() => { + if (!closed) { + stop('client_disconnect'); + } + }); + + request.on('close', cleanup); + response.on('close', cleanup); + maybeStart(); + }, + attachFrames(nextWebsocket) { + if (closed) { + nextWebsocket.close(1011, 'Playback closed'); + return; + } + + if (websocket && websocket.readyState === WebSocket.OPEN) { + nextWebsocket.close(1013, 'Frame stream already attached for this session'); + return; + } + + websocket = nextWebsocket; + + sendJson(websocket, { + type: 'ready', + codec: 'jpeg', + fps, + quality, + width, + }); + + websocket.on('close', () => { + if (!closed) { + stop('client_disconnect'); + } + }); + + websocket.on('error', () => { + if (!closed) { + stop('websocket_error'); + } + }); + + maybeStart(); + }, }; + + readyTimer = setTimeout(() => { + if (!started && !closed) { + stop('consumer_attach_timeout'); + } + }, PLAYBACK_READY_TIMEOUT_MS).unref(); + + return playback; + + function maybeStart() { + if (started || closed || !audioResponse || !isWebSocketOpen(websocket)) { + return; + } + + started = true; + clearReadyTimer(); + + const source = createSourceInput(session.id, 'playback'); + releaseSource = once(source.release); + ffmpeg = spawnFfmpeg(buildPlaybackArgs(session, source.url), ['ignore', 'pipe', 'pipe', 'pipe']); + logger = createFfmpegLogger('playback', session.id, ffmpeg.pid); + logger.start(); + + const frameStream = ffmpeg.stdio[3]; + + ffmpeg.stdout.pipe(audioResponse); + frameStream.on('data', handleFrameData); + + ffmpeg.stderr.on('data', (chunk) => { + stderr = appendTail(stderr, chunk); + logger.stderr(chunk); + }); + + ffmpeg.on('error', (error) => { + stopReason = 'start_error'; + logger.error(error); + finishPlayback(1011, `Failed to start ffmpeg: ${error.message}`, null, null); + }); + + ffmpeg.on('close', (code, signal) => { + logger.close(code, signal, stopReason, stderr); + finishPlayback(1000, 'ffmpeg exited', code, signal); + }); + } + + function handleFrameData(chunk) { + frameBuffer = Buffer.concat([frameBuffer, chunk]); + + for (;;) { + const start = frameBuffer.indexOf(JPEG_SOI); + + if (start === -1) { + frameBuffer = Buffer.alloc(0); + return; + } + + const end = frameBuffer.indexOf(JPEG_EOI, start + JPEG_SOI.length); + + if (end === -1) { + frameBuffer = start === 0 ? frameBuffer : frameBuffer.subarray(start); + return; + } + + const jpeg = frameBuffer.subarray(start, end + JPEG_EOI.length); + frameBuffer = frameBuffer.subarray(end + JPEG_EOI.length); + const timestamp = frameIndex / fps; + frameIndex += 1; + + if (!isWebSocketOpen(websocket)) { + return; + } + + if (websocket.bufferedAmount > MAX_WS_BUFFER_BYTES) { + skippedFrames += 1; + continue; + } + + const packet = Buffer.allocUnsafe(8 + jpeg.length); + packet.writeDoubleLE(timestamp, 0); + jpeg.copy(packet, 8); + websocket.send(packet, { binary: true }, (error) => { + if (error && !closed) { + stop('websocket_send_error'); + } + }); + } + } + + function stop(reason) { + stopReason = reason; + + if (ffmpeg && ffmpeg.exitCode === null && ffmpeg.signalCode === null) { + stopProcess(ffmpeg); + return; + } + + finishPlayback(1000, 'playback stopped', null, null); + } + + function finishPlayback(websocketCode, websocketReason, ffmpegCode, ffmpegSignal) { + if (closed) { + return; + } + + closed = true; + clearReadyTimer(); + releaseSource(); + playbacks.delete(session.id); + + if (audioResponse && !audioResponse.writableEnded) { + audioResponse.end(); + } + + if (isWebSocketOpen(websocket)) { + sendJson(websocket, { + type: 'end', + code: ffmpegCode, + signal: ffmpegSignal, + skippedFrames, + message: summarizeFfmpegExit(ffmpegCode, ffmpegSignal, stderr), + }); + websocket.close(websocketCode, websocketReason); + } + + logInfo(`playback closed kind=${label} reason=${stopReason} frames=${frameIndex} skippedFrames=${skippedFrames}`); + } + + function clearReadyTimer() { + if (readyTimer) { + clearTimeout(readyTimer); + readyTimer = null; + } + } } -function createSourceInput(sessionId) { +function isWebSocketOpen(websocket) { + return websocket?.readyState === WebSocket.OPEN; +} + +function createSourceInput(sessionId, kind) { const token = randomUUID(); - sourceTokens.set(token, { sessionId, createdAt: Date.now() }); + sourceTokens.set(token, { sessionId, kind, createdAt: Date.now() }); return { url: `http://127.0.0.1:${getListeningPort()}/_source/${token}`, @@ -468,18 +576,24 @@ function getListeningPort() { return typeof address === 'object' && address ? address.port : PORT; } -function buildAudioArgs(session, inputUrl) { +function buildPlaybackArgs(session, inputUrl) { + const { fps, quality, width } = session.options; + const videoFilter = `fps=${fps},scale='min(${width},iw)':-2:flags=bicubic`; + return [ '-hide_banner', '-nostdin', '-loglevel', - 'warning', + FFMPEG_LOG_LEVEL, + '-nostats', + '-seekable', + FFMPEG_INPUT_SEEKABLE, '-re', '-i', inputUrl, - '-vn', '-map', '0:a:0?', + '-vn', '-ac', '2', '-ar', @@ -491,21 +605,8 @@ function buildAudioArgs(session, inputUrl) { '-f', 'mp3', 'pipe:1', - ]; -} - -function buildFrameArgs(session, inputUrl) { - const { fps, quality, width } = session.options; - const videoFilter = `fps=${fps},scale='min(${width},iw)':-2:flags=bicubic`; - - return [ - '-hide_banner', - '-nostdin', - '-loglevel', - 'warning', - '-re', - '-i', - inputUrl, + '-map', + '0:v:0', '-an', '-vf', videoFilter, @@ -515,16 +616,69 @@ function buildFrameArgs(session, inputUrl) { String(quality), '-f', 'image2pipe', - 'pipe:1', + 'pipe:3', ]; } -function spawnFfmpeg(args) { +function spawnFfmpeg(args, stdio = ['ignore', 'pipe', 'pipe']) { return spawn(FFMPEG_PATH, args, { - stdio: ['ignore', 'pipe', 'pipe'], + stdio, }); } +function createFfmpegLogger(kind, sessionId, pid) { + const startedAt = Date.now(); + const label = `${kind}:${shortId(sessionId)}`; + const lineLogger = createLineLogger((line) => { + logWarn(`ffmpeg stderr kind=${label} pid=${pid ?? 'unknown'} line="${oneLine(redactSecrets(line))}"`); + }); + + return { + start() { + logInfo(`ffmpeg started kind=${label} pid=${pid ?? 'unknown'} loglevel=${FFMPEG_LOG_LEVEL}`); + }, + stderr(chunk) { + lineLogger.write(chunk); + }, + error(error) { + logWarn(`ffmpeg start error kind=${label} pid=${pid ?? 'unknown'} error=${oneLine(error.message)}`); + }, + close(code, signal, reason, stderr) { + lineLogger.flush(); + const level = reason === 'client_disconnect' || code === 0 || signal === 'SIGTERM' ? 'info' : 'warn'; + const tail = redactSecrets(stderr).trim(); + const tailText = tail ? ` stderrTail="${oneLine(tail)}"` : ''; + + log(level, `ffmpeg exited kind=${label} pid=${pid ?? 'unknown'} code=${code ?? 'null'} signal=${signal ?? 'none'} reason=${reason} durationMs=${Date.now() - startedAt}${tailText}`); + }, + }; +} + +function createLineLogger(callback) { + let buffer = ''; + + return { + write(chunk) { + buffer += chunk.toString('utf8'); + const lines = buffer.split(/\r?\n/); + buffer = lines.pop() ?? ''; + + for (const line of lines) { + if (line.trim()) { + callback(line); + } + } + }, + flush() { + if (buffer.trim()) { + callback(buffer); + } + + buffer = ''; + }, + }; +} + function copyUpstreamHeaders(upstreamHeaders, response) { for (const header of [ 'accept-ranges', @@ -555,6 +709,33 @@ function appendTail(current, chunk) { return next.length > 4000 ? next.slice(-4000) : next; } +function shortId(id) { + return id.slice(0, 8); +} + +function oneLine(value) { + return String(value).replace(/\s+/g, ' ').trim().slice(0, 1200); +} + +function logInfo(message) { + log('info', message); +} + +function logWarn(message) { + log('warn', message); +} + +function log(level, message) { + const output = `${new Date().toISOString()} ${message}`; + + if (level === 'warn') { + console.warn(output); + return; + } + + console.log(output); +} + function summarizeFfmpegExit(code, signal, stderr) { if (signal) { return `ffmpeg stopped by ${signal}.`;