single ffmpeg stream

This commit is contained in:
2026-05-01 22:41:51 -07:00
parent e51aeae54e
commit 5182686426
3 changed files with 367 additions and 176 deletions

View File

@@ -41,6 +41,14 @@ The app uses CPU decoding by default, so no video device is required. The compos
Recently played URLs are stored globally by the backend. In Docker Compose, they are persisted in the `frame-stream-data` named volume.
`ffmpeg` worker lifecycle, stderr warnings/errors, and source proxy open/close events are written to stdout/stderr, so they appear in `docker logs`. For more detail while debugging a stream, set `FFMPEG_LOG_LEVEL=info` in Docker Compose and run:
```sh
docker logs -f frame-stream-player
```
The app sets `FFMPEG_INPUT_SEEKABLE=0` by default so `ffmpeg` reads stream inputs sequentially and avoids extra HTTP range connections. If a specific VOD file requires seeking for metadata, set `FFMPEG_INPUT_SEEKABLE=-1` to restore ffmpeg's automatic behavior.
## Tuning
The UI intentionally hides these settings, but the backend still supports them through `POST /api/session`.

View File

@@ -9,6 +9,8 @@ services:
environment:
PORT: "3000"
NODE_ENV: production
FFMPEG_LOG_LEVEL: warning
FFMPEG_INPUT_SEEKABLE: "0"
RECENT_URLS_PATH: /app/data/recent-urls.json
extra_hosts:
- "host.docker.internal:host-gateway"

View File

@@ -18,9 +18,12 @@ const wss = new WebSocketServer({ noServer: true });
const PORT = Number(process.env.PORT ?? 3000);
const FFMPEG_PATH = process.env.FFMPEG_PATH ?? 'ffmpeg';
const FFMPEG_LOG_LEVEL = process.env.FFMPEG_LOG_LEVEL ?? 'warning';
const FFMPEG_INPUT_SEEKABLE = process.env.FFMPEG_INPUT_SEEKABLE ?? '0';
const RECENT_URLS_PATH = process.env.RECENT_URLS_PATH ?? path.join(__dirname, '..', 'data', 'recent-urls.json');
const RECENT_URL_LIMIT = clampInteger(process.env.RECENT_URL_LIMIT, 12, 1, 50);
const SESSION_TTL_MS = 60 * 60 * 1000;
const PLAYBACK_READY_TIMEOUT_MS = 15 * 1000;
const MAX_WS_BUFFER_BYTES = 12 * 1024 * 1024;
const JPEG_SOI = Buffer.from([0xff, 0xd8]);
const JPEG_EOI = Buffer.from([0xff, 0xd9]);
@@ -34,6 +37,7 @@ const defaults = {
const sessions = new Map();
const sourceTokens = new Map();
const playbacks = new Map();
let recentUrls = [];
let recentWrite = Promise.resolve();
@@ -88,6 +92,11 @@ app.post('/api/session', async (request, response) => {
app.all('/_source/:token', async (request, response) => {
const source = sourceTokens.get(request.params.token);
const session = source ? getSession(source.sessionId) : null;
const startedAt = Date.now();
const sourceLabel = source ? `${source.kind}:${shortId(source.sessionId)}` : `unknown:${request.params.token.slice(0, 8)}`;
let bytes = 0;
let upstreamEnded = false;
let upstreamStatus = 'pending';
if (!session) {
response.status(404).end();
@@ -95,7 +104,13 @@ app.all('/_source/:token', async (request, response) => {
}
const controller = new AbortController();
const cleanup = once(() => controller.abort());
const logClose = once(() => {
logInfo(`source closed kind=${sourceLabel} status=${upstreamStatus} bytes=${bytes} upstreamEnded=${upstreamEnded} durationMs=${Date.now() - startedAt}`);
});
const cleanup = once(() => {
controller.abort();
logClose();
});
response.on('close', cleanup);
try {
@@ -115,6 +130,9 @@ app.all('/_source/:token', async (request, response) => {
signal: controller.signal,
});
upstreamStatus = String(upstream.status);
logInfo(`source connected kind=${sourceLabel} status=${upstream.status} contentType=${upstream.headers.get('content-type') ?? 'unknown'}`);
response.status(upstream.status);
copyUpstreamHeaders(upstream.headers, response);
@@ -123,7 +141,15 @@ app.all('/_source/:token', async (request, response) => {
return;
}
Readable.fromWeb(upstream.body).on('error', (error) => {
Readable.fromWeb(upstream.body).on('data', (chunk) => {
bytes += chunk.length;
}).on('end', () => {
upstreamEnded = true;
}).on('error', (error) => {
if (!controller.signal.aborted) {
logWarn(`source stream error kind=${sourceLabel} error=${oneLine(error.message)}`);
}
if (!response.destroyed) {
response.destroy(error);
}
@@ -133,7 +159,7 @@ app.all('/_source/:token', async (request, response) => {
return;
}
console.error(`Source proxy failed: ${error.message}`);
logWarn(`source failed kind=${sourceLabel} error=${oneLine(error.message)}`);
if (!response.headersSent) {
response.status(502).json({ error: 'Failed to fetch source stream.' });
@@ -151,51 +177,7 @@ app.get('/audio/:sessionId', (request, response) => {
return;
}
const worker = createAudioWorker(session);
const releaseWorker = once(worker.release);
const ffmpeg = spawnFfmpeg(worker.args);
let stderr = '';
response.set({
'Cache-Control': 'no-store',
'Content-Type': 'audio/mpeg',
'X-Content-Type-Options': 'nosniff',
});
response.flushHeaders();
ffmpeg.stderr.on('data', (chunk) => {
stderr = appendTail(stderr, chunk);
});
ffmpeg.stdout.pipe(response);
ffmpeg.on('error', (error) => {
releaseWorker();
console.error(`Failed to start ffmpeg audio worker: ${error.message}`);
if (!response.writableEnded) {
response.end();
}
});
ffmpeg.on('close', (code, signal) => {
releaseWorker();
if (code && code !== 255) {
console.warn(`ffmpeg audio worker exited with code ${code}: ${redactSecrets(stderr)}`);
}
if (signal) {
console.warn(`ffmpeg audio worker stopped by ${signal}`);
}
if (!response.writableEnded) {
response.end();
}
});
const cleanup = once(() => stopProcess(ffmpeg));
request.on('close', cleanup);
response.on('close', cleanup);
getOrCreatePlayback(session).attachAudio(request, response);
});
server.on('upgrade', (request, socket, head) => {
@@ -222,100 +204,7 @@ wss.on('connection', (websocket, _request, sessionId) => {
return;
}
const worker = createFrameWorker(session);
const releaseWorker = once(worker.release);
const ffmpeg = spawnFfmpeg(worker.args);
const { fps, quality, width } = session.options;
let buffer = Buffer.alloc(0);
let frameIndex = 0;
let skippedFrames = 0;
let stderr = '';
let closedByClient = false;
sendJson(websocket, {
type: 'ready',
codec: 'jpeg',
fps,
quality,
width,
});
ffmpeg.stdout.on('data', (chunk) => {
buffer = Buffer.concat([buffer, chunk]);
for (;;) {
const start = buffer.indexOf(JPEG_SOI);
if (start === -1) {
buffer = Buffer.alloc(0);
return;
}
const end = buffer.indexOf(JPEG_EOI, start + JPEG_SOI.length);
if (end === -1) {
buffer = start === 0 ? buffer : buffer.subarray(start);
return;
}
const jpeg = buffer.subarray(start, end + JPEG_EOI.length);
buffer = buffer.subarray(end + JPEG_EOI.length);
const timestamp = frameIndex / fps;
frameIndex += 1;
if (websocket.readyState !== WebSocket.OPEN) {
return;
}
if (websocket.bufferedAmount > MAX_WS_BUFFER_BYTES) {
skippedFrames += 1;
continue;
}
const packet = Buffer.allocUnsafe(8 + jpeg.length);
packet.writeDoubleLE(timestamp, 0);
jpeg.copy(packet, 8);
websocket.send(packet, { binary: true });
}
});
ffmpeg.stderr.on('data', (chunk) => {
stderr = appendTail(stderr, chunk);
});
ffmpeg.on('error', (error) => {
releaseWorker();
sendJson(websocket, {
type: 'error',
message: `Failed to start ffmpeg: ${error.message}`,
});
websocket.close(1011, 'ffmpeg start failed');
});
ffmpeg.on('close', (code, signal) => {
releaseWorker();
if (websocket.readyState === WebSocket.OPEN && !closedByClient) {
sendJson(websocket, {
type: 'end',
code,
signal,
skippedFrames,
message: summarizeFfmpegExit(code, signal, stderr),
});
websocket.close(1000, 'ffmpeg exited');
}
});
websocket.on('close', () => {
closedByClient = true;
stopProcess(ffmpeg);
});
websocket.on('error', () => {
closedByClient = true;
stopProcess(ffmpeg);
});
getOrCreatePlayback(session).attachFrames(websocket);
});
setInterval(() => {
@@ -435,27 +324,246 @@ function getSession(sessionId) {
return session;
}
function createAudioWorker(session) {
const source = createSourceInput(session.id);
function getOrCreatePlayback(session) {
const existing = playbacks.get(session.id);
return {
args: buildAudioArgs(session, source.url),
release: source.release,
};
if (existing && !existing.closed) {
return existing;
}
const playback = createPlayback(session);
playbacks.set(session.id, playback);
return playback;
}
function createFrameWorker(session) {
const source = createSourceInput(session.id);
function createPlayback(session) {
const { fps, quality, width } = session.options;
const label = `playback:${shortId(session.id)}`;
let audioResponse = null;
let websocket = null;
let ffmpeg = null;
let logger = null;
let releaseSource = () => {};
let stderr = '';
let frameBuffer = Buffer.alloc(0);
let frameIndex = 0;
let skippedFrames = 0;
let stopReason = 'process_exit';
let started = false;
let closed = false;
let readyTimer = null;
return {
args: buildFrameArgs(session, source.url),
release: source.release,
const playback = {
get closed() {
return closed;
},
attachAudio(request, response) {
if (closed) {
response.status(410).end();
return;
}
if (audioResponse && !audioResponse.writableEnded) {
response.status(409).json({ error: 'Audio stream already attached for this session.' });
return;
}
audioResponse = response;
response.set({
'Cache-Control': 'no-store',
'Content-Type': 'audio/mpeg',
'X-Content-Type-Options': 'nosniff',
});
response.flushHeaders();
const cleanup = once(() => {
if (!closed) {
stop('client_disconnect');
}
});
request.on('close', cleanup);
response.on('close', cleanup);
maybeStart();
},
attachFrames(nextWebsocket) {
if (closed) {
nextWebsocket.close(1011, 'Playback closed');
return;
}
if (websocket && websocket.readyState === WebSocket.OPEN) {
nextWebsocket.close(1013, 'Frame stream already attached for this session');
return;
}
websocket = nextWebsocket;
sendJson(websocket, {
type: 'ready',
codec: 'jpeg',
fps,
quality,
width,
});
websocket.on('close', () => {
if (!closed) {
stop('client_disconnect');
}
});
websocket.on('error', () => {
if (!closed) {
stop('websocket_error');
}
});
maybeStart();
},
};
readyTimer = setTimeout(() => {
if (!started && !closed) {
stop('consumer_attach_timeout');
}
}, PLAYBACK_READY_TIMEOUT_MS).unref();
return playback;
function maybeStart() {
if (started || closed || !audioResponse || !isWebSocketOpen(websocket)) {
return;
}
started = true;
clearReadyTimer();
const source = createSourceInput(session.id, 'playback');
releaseSource = once(source.release);
ffmpeg = spawnFfmpeg(buildPlaybackArgs(session, source.url), ['ignore', 'pipe', 'pipe', 'pipe']);
logger = createFfmpegLogger('playback', session.id, ffmpeg.pid);
logger.start();
const frameStream = ffmpeg.stdio[3];
ffmpeg.stdout.pipe(audioResponse);
frameStream.on('data', handleFrameData);
ffmpeg.stderr.on('data', (chunk) => {
stderr = appendTail(stderr, chunk);
logger.stderr(chunk);
});
ffmpeg.on('error', (error) => {
stopReason = 'start_error';
logger.error(error);
finishPlayback(1011, `Failed to start ffmpeg: ${error.message}`, null, null);
});
ffmpeg.on('close', (code, signal) => {
logger.close(code, signal, stopReason, stderr);
finishPlayback(1000, 'ffmpeg exited', code, signal);
});
}
function handleFrameData(chunk) {
frameBuffer = Buffer.concat([frameBuffer, chunk]);
for (;;) {
const start = frameBuffer.indexOf(JPEG_SOI);
if (start === -1) {
frameBuffer = Buffer.alloc(0);
return;
}
const end = frameBuffer.indexOf(JPEG_EOI, start + JPEG_SOI.length);
if (end === -1) {
frameBuffer = start === 0 ? frameBuffer : frameBuffer.subarray(start);
return;
}
const jpeg = frameBuffer.subarray(start, end + JPEG_EOI.length);
frameBuffer = frameBuffer.subarray(end + JPEG_EOI.length);
const timestamp = frameIndex / fps;
frameIndex += 1;
if (!isWebSocketOpen(websocket)) {
return;
}
if (websocket.bufferedAmount > MAX_WS_BUFFER_BYTES) {
skippedFrames += 1;
continue;
}
const packet = Buffer.allocUnsafe(8 + jpeg.length);
packet.writeDoubleLE(timestamp, 0);
jpeg.copy(packet, 8);
websocket.send(packet, { binary: true }, (error) => {
if (error && !closed) {
stop('websocket_send_error');
}
});
}
}
function stop(reason) {
stopReason = reason;
if (ffmpeg && ffmpeg.exitCode === null && ffmpeg.signalCode === null) {
stopProcess(ffmpeg);
return;
}
finishPlayback(1000, 'playback stopped', null, null);
}
function finishPlayback(websocketCode, websocketReason, ffmpegCode, ffmpegSignal) {
if (closed) {
return;
}
closed = true;
clearReadyTimer();
releaseSource();
playbacks.delete(session.id);
if (audioResponse && !audioResponse.writableEnded) {
audioResponse.end();
}
if (isWebSocketOpen(websocket)) {
sendJson(websocket, {
type: 'end',
code: ffmpegCode,
signal: ffmpegSignal,
skippedFrames,
message: summarizeFfmpegExit(ffmpegCode, ffmpegSignal, stderr),
});
websocket.close(websocketCode, websocketReason);
}
logInfo(`playback closed kind=${label} reason=${stopReason} frames=${frameIndex} skippedFrames=${skippedFrames}`);
}
function clearReadyTimer() {
if (readyTimer) {
clearTimeout(readyTimer);
readyTimer = null;
}
}
}
function createSourceInput(sessionId) {
function isWebSocketOpen(websocket) {
return websocket?.readyState === WebSocket.OPEN;
}
function createSourceInput(sessionId, kind) {
const token = randomUUID();
sourceTokens.set(token, { sessionId, createdAt: Date.now() });
sourceTokens.set(token, { sessionId, kind, createdAt: Date.now() });
return {
url: `http://127.0.0.1:${getListeningPort()}/_source/${token}`,
@@ -468,18 +576,24 @@ function getListeningPort() {
return typeof address === 'object' && address ? address.port : PORT;
}
function buildAudioArgs(session, inputUrl) {
function buildPlaybackArgs(session, inputUrl) {
const { fps, quality, width } = session.options;
const videoFilter = `fps=${fps},scale='min(${width},iw)':-2:flags=bicubic`;
return [
'-hide_banner',
'-nostdin',
'-loglevel',
'warning',
FFMPEG_LOG_LEVEL,
'-nostats',
'-seekable',
FFMPEG_INPUT_SEEKABLE,
'-re',
'-i',
inputUrl,
'-vn',
'-map',
'0:a:0?',
'-vn',
'-ac',
'2',
'-ar',
@@ -491,21 +605,8 @@ function buildAudioArgs(session, inputUrl) {
'-f',
'mp3',
'pipe:1',
];
}
function buildFrameArgs(session, inputUrl) {
const { fps, quality, width } = session.options;
const videoFilter = `fps=${fps},scale='min(${width},iw)':-2:flags=bicubic`;
return [
'-hide_banner',
'-nostdin',
'-loglevel',
'warning',
'-re',
'-i',
inputUrl,
'-map',
'0:v:0',
'-an',
'-vf',
videoFilter,
@@ -515,16 +616,69 @@ function buildFrameArgs(session, inputUrl) {
String(quality),
'-f',
'image2pipe',
'pipe:1',
'pipe:3',
];
}
function spawnFfmpeg(args) {
function spawnFfmpeg(args, stdio = ['ignore', 'pipe', 'pipe']) {
return spawn(FFMPEG_PATH, args, {
stdio: ['ignore', 'pipe', 'pipe'],
stdio,
});
}
function createFfmpegLogger(kind, sessionId, pid) {
const startedAt = Date.now();
const label = `${kind}:${shortId(sessionId)}`;
const lineLogger = createLineLogger((line) => {
logWarn(`ffmpeg stderr kind=${label} pid=${pid ?? 'unknown'} line="${oneLine(redactSecrets(line))}"`);
});
return {
start() {
logInfo(`ffmpeg started kind=${label} pid=${pid ?? 'unknown'} loglevel=${FFMPEG_LOG_LEVEL}`);
},
stderr(chunk) {
lineLogger.write(chunk);
},
error(error) {
logWarn(`ffmpeg start error kind=${label} pid=${pid ?? 'unknown'} error=${oneLine(error.message)}`);
},
close(code, signal, reason, stderr) {
lineLogger.flush();
const level = reason === 'client_disconnect' || code === 0 || signal === 'SIGTERM' ? 'info' : 'warn';
const tail = redactSecrets(stderr).trim();
const tailText = tail ? ` stderrTail="${oneLine(tail)}"` : '';
log(level, `ffmpeg exited kind=${label} pid=${pid ?? 'unknown'} code=${code ?? 'null'} signal=${signal ?? 'none'} reason=${reason} durationMs=${Date.now() - startedAt}${tailText}`);
},
};
}
function createLineLogger(callback) {
let buffer = '';
return {
write(chunk) {
buffer += chunk.toString('utf8');
const lines = buffer.split(/\r?\n/);
buffer = lines.pop() ?? '';
for (const line of lines) {
if (line.trim()) {
callback(line);
}
}
},
flush() {
if (buffer.trim()) {
callback(buffer);
}
buffer = '';
},
};
}
function copyUpstreamHeaders(upstreamHeaders, response) {
for (const header of [
'accept-ranges',
@@ -555,6 +709,33 @@ function appendTail(current, chunk) {
return next.length > 4000 ? next.slice(-4000) : next;
}
function shortId(id) {
return id.slice(0, 8);
}
function oneLine(value) {
return String(value).replace(/\s+/g, ' ').trim().slice(0, 1200);
}
function logInfo(message) {
log('info', message);
}
function logWarn(message) {
log('warn', message);
}
function log(level, message) {
const output = `${new Date().toISOString()} ${message}`;
if (level === 'warn') {
console.warn(output);
return;
}
console.log(output);
}
function summarizeFfmpegExit(code, signal, stderr) {
if (signal) {
return `ffmpeg stopped by ${signal}.`;