From 13b1d768dc49a0efa070bc9089e2a7586ccff2b6 Mon Sep 17 00:00:00 2001 From: James Magahern Date: Mon, 4 May 2026 00:00:34 -0700 Subject: [PATCH] sequential frames support - Server now has configurable MAX_WS_BUFFER_BYTES defaulting to 2097152, and skips JPEG frames when the WebSocket is backed up instead of queueing stale frames in ws (server/index.js:30, server/index.js:1439). - Browser frame handling now decodes frames sequentially, drops late frames against the audio clock, caps pending/decoded queues, and draws only the latest due frame per animation tick (public/app.js:280, public/app.js:381). - Relay/split normal EOF closes are no longer mislabeled as client_disconnect, which should make logs around ffmpeg decode warnings less misleading (server/index.js:797, server/ index.js:1071). - Documented MAX_WS_BUFFER_BYTES in README, Compose, and AGENTS. --- AGENTS.md | 1 + README.md | 2 + docker-compose-example.yml | 1 + public/app.js | 157 ++++++++++++++++++++++++++++++++----- server/index.js | 109 +++++++++++++++++-------- 5 files changed, 218 insertions(+), 52 deletions(-) diff --git a/AGENTS.md b/AGENTS.md index 1ad3f5f..8a4ab83 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -170,6 +170,7 @@ Runtime: - `PLAYBACK_CONNECTION_MODE`: `split`, `relay`, or `single`. - `RECENT_URLS_PATH`: recent URL JSON path. - `RECENT_URL_LIMIT`: recent URL count, default `12`. +- `MAX_WS_BUFFER_BYTES`: server-side WebSocket JPEG frame backlog cap, default `2097152`. - `MAX_AUDIO_QUEUE_BYTES`: single-mode audio output queue cap, default `16777216`. - `MAX_RELAY_BRANCH_QUEUE_BYTES`: relay per-branch compressed-input queue cap, default `16777216`. diff --git a/README.md b/README.md index 638c096..6c1997e 100644 --- a/README.md +++ b/README.md @@ -49,6 +49,8 @@ docker logs -f frame-stream-player The app sets `FFMPEG_INPUT_SEEKABLE=0` by default so `ffmpeg` reads stream inputs sequentially and avoids extra HTTP range connections. If a specific VOD file requires seeking for metadata, set `FFMPEG_INPUT_SEEKABLE=-1` to restore ffmpeg's automatic behavior. +JPEG frames are dropped when the browser WebSocket falls behind instead of letting stale frames queue indefinitely. Tune the server-side backlog cap with `MAX_WS_BUFFER_BYTES`; the default is `2097152`. + In single mode, audio output from `ffmpeg` is buffered before it is written to the browser so short HTTP backpressure pauses are less likely to stall frame generation. Tune the cap with `MAX_AUDIO_QUEUE_BYTES`; the default is `16777216`. Playback uses `PLAYBACK_CONNECTION_MODE=split` by default. The Docker Compose example sets `PLAYBACK_CONNECTION_MODE=relay` so IPTV-style streams can be tested with one upstream connection. diff --git a/docker-compose-example.yml b/docker-compose-example.yml index a730733..4da89bb 100644 --- a/docker-compose-example.yml +++ b/docker-compose-example.yml @@ -15,6 +15,7 @@ services: PLAYBACK_CONNECTION_MODE: relay FFMPEG_LOG_LEVEL: warning FFMPEG_INPUT_SEEKABLE: "0" + MAX_WS_BUFFER_BYTES: "2097152" MAX_AUDIO_QUEUE_BYTES: "16777216" MAX_RELAY_BRANCH_QUEUE_BYTES: "16777216" RECENT_URLS_PATH: /app/data/recent-urls.json diff --git a/public/app.js b/public/app.js index caa8629..94ca6c2 100644 --- a/public/app.js +++ b/public/app.js @@ -23,11 +23,18 @@ const elements = { }; const context = elements.canvas.getContext('2d', { alpha: false }); +const FRAME_LATE_GRACE_SECONDS = 0.25; +const MAX_PENDING_FRAME_QUEUE_SECONDS = 2; +const MAX_DECODED_FRAME_QUEUE_SECONDS = 3; +const MIN_PENDING_FRAME_QUEUE = 12; +const MIN_DECODED_FRAME_QUEUE = 24; const state = { generation: 0, session: null, websocket: null, + pendingFrames: [], + decodingFrames: false, frames: [], currentBitmap: null, raf: 0, @@ -270,23 +277,20 @@ async function playAudio() { } } -async function handleFramePacket(packet, generation) { +function handleFramePacket(packet, generation) { if (!(packet instanceof ArrayBuffer) || packet.byteLength <= 8) { return; } const timestamp = new DataView(packet, 0, 8).getFloat64(0, true); - const blob = new Blob([packet.slice(8)], { type: 'image/jpeg' }); - const bitmap = await decodeImage(blob); - if (generation !== state.generation) { - releaseImage(bitmap); + if (generation !== state.generation || isLateFrame(timestamp)) { return; } - state.frames.push({ timestamp, bitmap }); - state.frameCount += 1; - trimFrameQueue(); + state.pendingFrames.push({ timestamp, jpeg: packet.slice(8), generation }); + trimPendingFrameQueue(); + void pumpFrameDecodeQueue(); } function handleControlMessage(rawMessage) { @@ -332,26 +336,34 @@ function drawReadyFrames() { return; } + dropLateDecodedFrames(); + const frameLeadSeconds = 1 / Math.max(1, state.session.options.fps); const targetTime = elements.audio.currentTime + frameLeadSeconds; - let drew = false; + let frameToDraw = null; while (state.frames.length > 0 && state.frames[0].timestamp <= targetTime) { const frame = state.frames.shift(); - if (state.currentBitmap) { - releaseImage(state.currentBitmap); + if (frameToDraw) { + releaseImage(frameToDraw.bitmap); } - state.currentBitmap = frame.bitmap; - drawBitmap(frame.bitmap); - drew = true; + frameToDraw = frame; } - if (drew) { - elements.loader.hidden = true; - clearPlayerMessage(); + if (!frameToDraw) { + return; } + + if (state.currentBitmap) { + releaseImage(state.currentBitmap); + } + + state.currentBitmap = frameToDraw.bitmap; + drawBitmap(frameToDraw.bitmap); + elements.loader.hidden = true; + clearPlayerMessage(); } function drawBitmap(bitmap) { @@ -366,9 +378,70 @@ function drawBitmap(bitmap) { context.drawImage(bitmap, 0, 0, elements.canvas.width, elements.canvas.height); } +async function pumpFrameDecodeQueue() { + if (state.decodingFrames) { + return; + } + + state.decodingFrames = true; + + try { + while (state.pendingFrames.length > 0) { + dropLatePendingFrames(); + + const frame = state.pendingFrames.shift(); + + if (!frame) { + return; + } + + if (frame.generation !== state.generation || isLateFrame(frame.timestamp)) { + continue; + } + + let bitmap; + + try { + bitmap = await decodeImage(new Blob([frame.jpeg], { type: 'image/jpeg' })); + } catch { + continue; + } + + if (frame.generation !== state.generation || isLateFrame(frame.timestamp)) { + releaseImage(bitmap); + continue; + } + + state.frames.push({ timestamp: frame.timestamp, bitmap }); + state.frameCount += 1; + trimFrameQueue(); + } + } finally { + state.decodingFrames = false; + + if (state.pendingFrames.length > 0) { + window.setTimeout(() => { + void pumpFrameDecodeQueue(); + }, 0); + } + } +} + +function trimPendingFrameQueue() { + dropLatePendingFrames(); + + const maxQueuedFrames = getFrameQueueLimit(MAX_PENDING_FRAME_QUEUE_SECONDS, MIN_PENDING_FRAME_QUEUE); + const overflow = state.pendingFrames.length - maxQueuedFrames; + + if (overflow > 0) { + state.pendingFrames.splice(0, overflow); + } +} + function trimFrameQueue() { - const fps = state.session?.options.fps ?? 24; - const maxQueuedFrames = Math.max(60, Math.ceil(fps * 8)); + dropLateDecodedFrames(); + + const maxQueuedFrames = getFrameQueueLimit(MAX_DECODED_FRAME_QUEUE_SECONDS, MIN_DECODED_FRAME_QUEUE); const overflow = state.frames.length - maxQueuedFrames; if (overflow <= 0) { @@ -382,6 +455,50 @@ function trimFrameQueue() { } } +function dropLatePendingFrames() { + let removeCount = 0; + + while (removeCount < state.pendingFrames.length && isLateFrame(state.pendingFrames[removeCount].timestamp)) { + removeCount += 1; + } + + if (removeCount > 0) { + state.pendingFrames.splice(0, removeCount); + } +} + +function dropLateDecodedFrames() { + let removeCount = 0; + + while (removeCount < state.frames.length && isLateFrame(state.frames[removeCount].timestamp)) { + removeCount += 1; + } + + if (removeCount <= 0) { + return; + } + + const removed = state.frames.splice(0, removeCount); + + for (const frame of removed) { + releaseImage(frame.bitmap); + } +} + +function getFrameQueueLimit(seconds, minimum) { + const fps = state.session?.options.fps ?? 24; + return Math.max(minimum, Math.ceil(fps * seconds)); +} + +function isLateFrame(timestamp) { + if (!state.session || state.isSeeking || elements.audio.paused || elements.audio.readyState === 0) { + return false; + } + + const currentTime = Number.isFinite(elements.audio.currentTime) ? elements.audio.currentTime : 0; + return timestamp < currentTime - FRAME_LATE_GRACE_SECONDS; +} + function stopSession({ showEntry: shouldShowEntry = true } = {}) { state.generation += 1; state.session = null; @@ -421,6 +538,8 @@ function stopSession({ showEntry: shouldShowEntry = true } = {}) { } function clearFrameQueue() { + state.pendingFrames = []; + for (const frame of state.frames) { releaseImage(frame.bitmap); } diff --git a/server/index.js b/server/index.js index 2c0c4b0..6d7896a 100644 --- a/server/index.js +++ b/server/index.js @@ -27,7 +27,7 @@ const RECENT_URL_LIMIT = clampInteger(process.env.RECENT_URL_LIMIT, 12, 1, 50); const SESSION_TTL_MS = 60 * 60 * 1000; const METADATA_PROBE_TIMEOUT_MS = 8 * 1000; const PLAYBACK_READY_TIMEOUT_MS = 15 * 1000; -const MAX_WS_BUFFER_BYTES = 12 * 1024 * 1024; +const MAX_WS_BUFFER_BYTES = clampInteger(process.env.MAX_WS_BUFFER_BYTES, 2 * 1024 * 1024, 128 * 1024, 64 * 1024 * 1024); const MAX_AUDIO_QUEUE_BYTES = clampInteger(process.env.MAX_AUDIO_QUEUE_BYTES, 16 * 1024 * 1024, 256 * 1024, 128 * 1024 * 1024); const MAX_RELAY_BRANCH_QUEUE_BYTES = clampInteger(process.env.MAX_RELAY_BRANCH_QUEUE_BYTES, 16 * 1024 * 1024, 512 * 1024, 256 * 1024 * 1024); const RELAY_BRANCH_PAUSE_BYTES = Math.floor(MAX_RELAY_BRANCH_QUEUE_BYTES / 2); @@ -551,6 +551,7 @@ function streamSplitAudio(request, response, session) { const ffmpeg = spawnFfmpeg(worker.args); const logger = createFfmpegLogger('audio', session.id, ffmpeg.pid); const stderrTail = createFfmpegStderrTail(); + let responseFinished = false; let stopReason = 'process_exit'; logger.start(); @@ -592,7 +593,15 @@ function streamSplitAudio(request, response, session) { } }); + response.on('finish', () => { + responseFinished = true; + }); + const cleanup = once(() => { + if (responseFinished || response.writableEnded) { + return; + } + stopReason = 'client_disconnect'; stopProcess(ffmpeg); }); @@ -612,6 +621,7 @@ function streamSplitFrames(websocket, session) { let frameIndex = 0; let skippedFrames = 0; let stopReason = 'process_exit'; + let serverClosingWebSocket = false; logger.start(); @@ -662,6 +672,7 @@ function streamSplitFrames(websocket, session) { skippedFrames, message: summarizeFfmpegExit(code, signal, stderrTail.value), }); + serverClosingWebSocket = true; websocket.close(1000, 'ffmpeg exited'); } @@ -669,6 +680,10 @@ function streamSplitFrames(websocket, session) { }); websocket.on('close', () => { + if (serverClosingWebSocket) { + return; + } + stopReason = 'client_disconnect'; stopProcess(ffmpeg); }); @@ -705,20 +720,18 @@ function streamSplitFrames(websocket, session) { return; } - if (websocket.bufferedAmount > MAX_WS_BUFFER_BYTES) { - skippedFrames += 1; - continue; + const sendResult = sendFramePacket(websocket, timestamp, jpeg, () => { + stopReason = 'websocket_send_error'; + stopProcess(ffmpeg); + }); + + if (sendResult === 'closed') { + return; } - const packet = Buffer.allocUnsafe(8 + jpeg.length); - packet.writeDoubleLE(timestamp, 0); - jpeg.copy(packet, 8); - websocket.send(packet, { binary: true }, (error) => { - if (error) { - stopReason = 'websocket_send_error'; - stopProcess(ffmpeg); - } - }); + if (sendResult === 'skipped') { + skippedFrames += 1; + } } } } @@ -749,6 +762,8 @@ function createRelayPlayback(session) { let frameExitCode = null; let frameExitSignal = null; let frameEndSent = false; + let audioResponseFinished = false; + let serverClosingWebSocket = false; let readyTimer = null; const audioStderr = createFfmpegStderrTail(); const frameStderr = createFfmpegStderrTail(); @@ -779,8 +794,12 @@ function createRelayPlayback(session) { }); response.flushHeaders(); + response.on('finish', () => { + audioResponseFinished = true; + }); + const cleanup = once(() => { - if (!closed) { + if (!closed && !audioResponseFinished && !response.writableEnded) { stop('client_disconnect'); } }); @@ -814,7 +833,7 @@ function createRelayPlayback(session) { }); websocket.on('close', () => { - if (!closed) { + if (!closed && !serverClosingWebSocket) { stop('client_disconnect'); } }); @@ -994,19 +1013,19 @@ function createRelayPlayback(session) { return; } - if (websocket.bufferedAmount > MAX_WS_BUFFER_BYTES) { - skippedFrames += 1; - continue; - } - - const packet = Buffer.allocUnsafe(8 + jpeg.length); - packet.writeDoubleLE(timestamp, 0); - jpeg.copy(packet, 8); - websocket.send(packet, { binary: true }, (error) => { + const sendResult = sendFramePacket(websocket, timestamp, jpeg, (error) => { if (error && !closed) { stop('websocket_send_error'); } }); + + if (sendResult === 'closed') { + return; + } + + if (sendResult === 'skipped') { + skippedFrames += 1; + } } } @@ -1062,6 +1081,7 @@ function createRelayPlayback(session) { skippedFrames, message: summarizeFfmpegExit(frameExitCode, frameExitSignal, frameStderr.value), }); + serverClosingWebSocket = true; websocket.close(1000, 'ffmpeg exited'); } @@ -1084,6 +1104,7 @@ function createRelayPlayback(session) { } if (isWebSocketOpen(websocket) && frameEndSent) { + serverClosingWebSocket = true; websocket.close(websocketCode, websocketReason); } else { sendFrameEndIfNeeded(); @@ -1344,19 +1365,19 @@ function createPlayback(session) { return; } - if (websocket.bufferedAmount > MAX_WS_BUFFER_BYTES) { - skippedFrames += 1; - continue; - } - - const packet = Buffer.allocUnsafe(8 + jpeg.length); - packet.writeDoubleLE(timestamp, 0); - jpeg.copy(packet, 8); - websocket.send(packet, { binary: true }, (error) => { + const sendResult = sendFramePacket(websocket, timestamp, jpeg, (error) => { if (error && !closed) { stop('websocket_send_error'); } }); + + if (sendResult === 'closed') { + return; + } + + if (sendResult === 'skipped') { + skippedFrames += 1; + } } } @@ -1415,6 +1436,28 @@ function isWebSocketOpen(websocket) { return websocket?.readyState === WebSocket.OPEN; } +function sendFramePacket(websocket, timestamp, jpeg, onError) { + if (!isWebSocketOpen(websocket)) { + return 'closed'; + } + + const packetBytes = 8 + jpeg.length; + + if (websocket.bufferedAmount > 0 && websocket.bufferedAmount + packetBytes > MAX_WS_BUFFER_BYTES) { + return 'skipped'; + } + + const packet = Buffer.allocUnsafe(packetBytes); + packet.writeDoubleLE(timestamp, 0); + jpeg.copy(packet, 8); + websocket.send(packet, { binary: true }, (error) => { + if (error) { + onError(error); + } + }); + return 'sent'; +} + function createSourceInput(sessionId, kind) { const token = randomUUID(); sourceTokens.set(token, { sessionId, kind, createdAt: Date.now() });