adds seeking support

This commit is contained in:
2026-05-02 18:53:35 -07:00
parent acd73436a7
commit a3429dee85
4 changed files with 604 additions and 16 deletions

View File

@@ -18,12 +18,14 @@ const wss = new WebSocketServer({ noServer: true });
const PORT = Number(process.env.PORT ?? 3000);
const FFMPEG_PATH = process.env.FFMPEG_PATH ?? 'ffmpeg';
const FFPROBE_PATH = process.env.FFPROBE_PATH ?? 'ffprobe';
const FFMPEG_LOG_LEVEL = process.env.FFMPEG_LOG_LEVEL ?? 'warning';
const FFMPEG_INPUT_SEEKABLE = process.env.FFMPEG_INPUT_SEEKABLE ?? '0';
const PLAYBACK_CONNECTION_MODE = parsePlaybackConnectionMode(process.env.PLAYBACK_CONNECTION_MODE ?? process.env.PLAYBACK_MODE);
const RECENT_URLS_PATH = process.env.RECENT_URLS_PATH ?? path.join(__dirname, '..', 'data', 'recent-urls.json');
const RECENT_URL_LIMIT = clampInteger(process.env.RECENT_URL_LIMIT, 12, 1, 50);
const SESSION_TTL_MS = 60 * 60 * 1000;
const METADATA_PROBE_TIMEOUT_MS = 8 * 1000;
const PLAYBACK_READY_TIMEOUT_MS = 15 * 1000;
const MAX_WS_BUFFER_BYTES = 12 * 1024 * 1024;
const MAX_AUDIO_QUEUE_BYTES = clampInteger(process.env.MAX_AUDIO_QUEUE_BYTES, 16 * 1024 * 1024, 256 * 1024, 128 * 1024 * 1024);
@@ -80,9 +82,14 @@ app.post('/api/session', async (request, response) => {
id,
url,
options,
duration: null,
metadataStatus: 'pending',
seekSeconds: 0,
seekGeneration: 0,
createdAt: Date.now(),
lastUsedAt: Date.now(),
});
startSessionMetadataProbe(sessions.get(id));
try {
await addRecentUrl(url);
@@ -90,7 +97,56 @@ app.post('/api/session', async (request, response) => {
console.warn(`Failed to store recent URL: ${error.message}`);
}
response.status(201).json({ id, options });
response.status(201).json(formatSessionPayload(sessions.get(id)));
});
app.get('/api/session/:sessionId', (request, response) => {
const session = getSession(request.params.sessionId);
if (!session) {
response.status(404).json({ error: 'Unknown or expired session.' });
return;
}
response.json(formatSessionPayload(session));
});
app.post('/api/session/:sessionId/seek', (request, response) => {
const session = getSession(request.params.sessionId);
if (!session) {
response.status(404).json({ error: 'Unknown or expired session.' });
return;
}
if (!isSessionSeekable(session)) {
response.status(409).json({ error: 'This stream is not seekable.' });
return;
}
const requestedSeconds = Number(request.body?.time);
if (!Number.isFinite(requestedSeconds)) {
response.status(400).json({ error: 'A numeric seek time is required.' });
return;
}
const seekSeconds = clampNumber(requestedSeconds, 0, session.duration);
const playback = playbacks.get(session.id);
session.seekSeconds = seekSeconds;
session.seekGeneration += 1;
session.lastUsedAt = Date.now();
if (playback && !playback.closed) {
playback.stop('client_seek');
if (playbacks.get(session.id) === playback) {
playbacks.delete(session.id);
}
}
logInfo(`session seeked id=${shortId(session.id)} time=${seekSeconds.toFixed(3)} duration=${session.duration.toFixed(3)}`);
response.json(formatSessionPayload(session));
});
app.all('/_source/:token', async (request, response) => {
@@ -297,6 +353,10 @@ function clampInteger(value, fallback, min, max) {
return Math.min(max, Math.max(min, Math.round(parsed)));
}
function clampNumber(value, min, max) {
return Math.min(max, Math.max(min, value));
}
function parseAudioBitrate(value) {
if (typeof value !== 'string') {
return defaults.audioBitrate;
@@ -305,6 +365,124 @@ function parseAudioBitrate(value) {
return /^\d{2,3}k$/i.test(value) ? value.toLowerCase() : defaults.audioBitrate;
}
function formatSessionPayload(session) {
return {
id: session.id,
options: session.options,
duration: Number.isFinite(session.duration) ? session.duration : null,
metadataStatus: session.metadataStatus,
seekable: isSessionSeekable(session),
seekSeconds: session.seekSeconds,
seekGeneration: session.seekGeneration,
};
}
function isSessionSeekable(session) {
return PLAYBACK_CONNECTION_MODE !== 'relay' && Number.isFinite(session.duration) && session.duration > 0;
}
function startSessionMetadataProbe(session) {
void probeSessionDuration(session).then((duration) => {
const current = sessions.get(session.id);
if (current !== session) {
return;
}
session.duration = duration;
session.metadataStatus = Number.isFinite(duration) ? 'ready' : 'unavailable';
}).catch((error) => {
const current = sessions.get(session.id);
if (current !== session) {
return;
}
session.duration = null;
session.metadataStatus = 'unavailable';
logWarn(`duration probe failed id=${shortId(session.id)} error=${oneLine(error.message)}`);
});
}
async function probeSessionDuration(session) {
const source = createSourceInput(session.id, 'probe');
try {
const output = await runFfprobe([
'-v',
'error',
'-show_entries',
'format=duration',
'-of',
'json',
source.url,
]);
const payload = JSON.parse(output);
const duration = Number(payload?.format?.duration);
return Number.isFinite(duration) && duration > 0 ? duration : null;
} finally {
source.release();
}
}
function runFfprobe(args) {
return new Promise((resolve, reject) => {
const ffprobe = spawn(FFPROBE_PATH, args, {
stdio: ['ignore', 'pipe', 'pipe'],
});
let stdout = '';
let stderr = '';
let timedOut = false;
let settled = false;
const finish = (callback) => {
if (settled) {
return;
}
settled = true;
clearTimeout(timer);
callback();
};
const timer = setTimeout(() => {
timedOut = true;
stopProcess(ffprobe);
}, METADATA_PROBE_TIMEOUT_MS);
timer.unref();
ffprobe.stdout.on('data', (chunk) => {
stdout = appendTail(stdout, chunk);
});
ffprobe.stderr.on('data', (chunk) => {
stderr = appendTail(stderr, chunk);
});
ffprobe.on('error', (error) => {
finish(() => reject(error));
});
ffprobe.on('close', (code, signal) => {
finish(() => {
if (timedOut) {
reject(new Error('ffprobe timed out.'));
return;
}
if (code === 0) {
resolve(stdout);
return;
}
const detail = redactSecrets(stderr).trim();
reject(new Error(detail ? `ffprobe exited with code ${code}: ${detail}` : `ffprobe exited with code ${code ?? 'null'} signal ${signal ?? 'none'}.`));
});
});
});
}
async function loadRecentUrls() {
try {
const raw = await fs.readFile(RECENT_URLS_PATH, 'utf8');
@@ -443,6 +621,9 @@ function streamSplitFrames(websocket, session) {
fps,
quality,
width,
duration: session.duration,
seekable: isSessionSeekable(session),
seekSeconds: session.seekSeconds,
});
ffmpeg.stdout.on('data', handleFrameData);
@@ -576,6 +757,9 @@ function createRelayPlayback(session) {
get closed() {
return closed;
},
stop(reason = 'playback_stopped') {
stop(reason);
},
attachAudio(request, response) {
if (closed) {
response.status(410).end();
@@ -624,6 +808,9 @@ function createRelayPlayback(session) {
fps,
quality,
width,
duration: session.duration,
seekable: isSessionSeekable(session),
seekSeconds: session.seekSeconds,
});
websocket.on('close', () => {
@@ -888,7 +1075,9 @@ function createRelayPlayback(session) {
sourceController?.abort();
audioInput?.destroy();
frameInput?.destroy();
playbacks.delete(session.id);
if (playbacks.get(session.id) === playback) {
playbacks.delete(session.id);
}
if (audioResponse && !audioResponse.writableEnded) {
audioResponse.end();
@@ -939,6 +1128,9 @@ function createPlayback(session) {
get closed() {
return closed;
},
stop(reason = 'playback_stopped') {
stop(reason);
},
attachAudio(request, response) {
if (closed) {
response.status(410).end();
@@ -987,6 +1179,9 @@ function createPlayback(session) {
fps,
quality,
width,
duration: session.duration,
seekable: isSessionSeekable(session),
seekSeconds: session.seekSeconds,
});
websocket.on('close', () => {
@@ -1184,7 +1379,9 @@ function createPlayback(session) {
closed = true;
clearReadyTimer();
releaseSource();
playbacks.delete(session.id);
if (playbacks.get(session.id) === playback) {
playbacks.delete(session.id);
}
audioQueue = [];
audioQueueBytes = 0;
@@ -1252,16 +1449,16 @@ function createFrameWorker(session) {
}
function buildRelayAudioArgs(session) {
return buildAudioArgs(session, 'pipe:0', { seekable: false });
return buildAudioArgs(session, 'pipe:0', { seekable: false, startTime: 0 });
}
function buildRelayFrameArgs(session) {
return buildFrameArgs(session, 'pipe:0', { seekable: false });
return buildFrameArgs(session, 'pipe:0', { seekable: false, startTime: 0 });
}
function buildPlaybackArgs(session, inputUrl) {
return [
...buildInputArgs(inputUrl),
...buildInputArgs(inputUrl, { startTime: session.seekSeconds }),
'-map',
'0:a:0?',
'-vn',
@@ -1284,7 +1481,7 @@ function buildPlaybackArgs(session, inputUrl) {
function buildAudioArgs(session, inputUrl, inputOptions) {
return [
...buildInputArgs(inputUrl, inputOptions),
...buildInputArgs(inputUrl, { ...inputOptions, startTime: inputOptions?.startTime ?? session.seekSeconds }),
'-map',
'0:a:0?',
'-vn',
@@ -1304,14 +1501,14 @@ function buildAudioArgs(session, inputUrl, inputOptions) {
function buildFrameArgs(session, inputUrl, inputOptions) {
return [
...buildInputArgs(inputUrl, inputOptions),
...buildInputArgs(inputUrl, { ...inputOptions, startTime: inputOptions?.startTime ?? session.seekSeconds }),
'-map',
'0:v:0',
...buildFrameOutputArgs(session, 'pipe:1'),
];
}
function buildInputArgs(inputUrl, { seekable = true } = {}) {
function buildInputArgs(inputUrl, { seekable = true, startTime = 0 } = {}) {
const args = [
'-hide_banner',
'-nostdin',
@@ -1321,13 +1518,21 @@ function buildInputArgs(inputUrl, { seekable = true } = {}) {
];
if (seekable) {
args.push('-seekable', FFMPEG_INPUT_SEEKABLE);
args.push('-seekable', startTime > 0 ? '1' : FFMPEG_INPUT_SEEKABLE);
}
if (startTime > 0) {
args.push('-ss', formatFfmpegSeconds(startTime));
}
args.push('-re', '-i', inputUrl);
return args;
}
function formatFfmpegSeconds(seconds) {
return clampNumber(seconds, 0, Number.MAX_SAFE_INTEGER).toFixed(3);
}
function buildFrameOutputArgs(session, outputUrl) {
const { fps, quality, width } = session.options;
const videoFilter = `fps=${fps},scale=w='min(${width},iw)':h=-2:flags=bicubic:out_range=pc,format=yuvj420p`;