Add YouTube playback and queue flow

This commit is contained in:
2026-06-11 21:13:48 -07:00
parent 2866d33dec
commit 20f6d4d192
8 changed files with 612 additions and 101 deletions

View File

@@ -19,6 +19,9 @@ const wss = new WebSocketServer({ noServer: true, perMessageDeflate: false });
const PORT = Number(process.env.PORT ?? 3000);
const FFMPEG_PATH = process.env.FFMPEG_PATH ?? 'ffmpeg';
const FFPROBE_PATH = process.env.FFPROBE_PATH ?? 'ffprobe';
const YT_DLP_PATH = process.env.YT_DLP_PATH ?? 'yt-dlp';
const YT_DLP_FORMAT = process.env.YT_DLP_FORMAT ?? 'best[ext=mp4][vcodec!=none][acodec!=none]/best[vcodec!=none][acodec!=none]/best';
const YT_DLP_TIMEOUT_MS = clampInteger(process.env.YT_DLP_TIMEOUT_MS, 45 * 1000, 5 * 1000, 120 * 1000);
const FFMPEG_LOG_LEVEL = process.env.FFMPEG_LOG_LEVEL ?? 'warning';
const FFMPEG_INPUT_SEEKABLE = process.env.FFMPEG_INPUT_SEEKABLE ?? '0';
const FFMPEG_HTTP_RECONNECT = parseBoolean(process.env.FFMPEG_HTTP_RECONNECT, true);
@@ -39,6 +42,7 @@ const FRAME_CONDITION_LOG_INTERVAL_MS = 5000;
const MAX_WS_BUFFER_BYTES = clampInteger(process.env.MAX_WS_BUFFER_BYTES, 2 * 1024 * 1024, 128 * 1024, 64 * 1024 * 1024);
const MAX_AUDIO_QUEUE_BYTES = clampInteger(process.env.MAX_AUDIO_QUEUE_BYTES, 4 * 1024 * 1024, 256 * 1024, 128 * 1024 * 1024);
const MAX_RELAY_BRANCH_QUEUE_BYTES = clampInteger(process.env.MAX_RELAY_BRANCH_QUEUE_BYTES, 8 * 1024 * 1024, 512 * 1024, 256 * 1024 * 1024);
const MAX_YT_DLP_OUTPUT_BYTES = 8 * 1024 * 1024;
const RELAY_BRANCH_PAUSE_BYTES = Math.floor(MAX_RELAY_BRANCH_QUEUE_BYTES / 2);
const JPEG_SOI = Buffer.from([0xff, 0xd8]);
const JPEG_EOI = Buffer.from([0xff, 0xd9]);
@@ -78,6 +82,33 @@ app.get('/api/recent-urls', (_request, response) => {
});
});
app.post('/api/recent-urls', async (request, response) => {
let url;
try {
url = parseStreamUrl(request.body?.url);
} catch (error) {
response.status(400).json({ error: error.message });
return;
}
try {
await addRecentUrl(url);
} catch (error) {
console.warn(`Failed to store recent URL: ${error.message}`);
response.status(500).json({ error: 'Failed to store recent URL.' });
return;
}
response.status(201).json({
urls: recentUrls.map((item) => ({
url: item.url,
displayUrl: redactSecrets(item.url),
lastPlayedAt: item.lastPlayedAt,
})),
});
});
app.get('/api/favorites', (_request, response) => {
response.json({ favorites: formatFavoritesPayload() });
});
@@ -108,6 +139,7 @@ app.put('/api/favorites', async (request, response) => {
app.post('/api/session', async (request, response) => {
let url;
let source;
try {
url = parseStreamUrl(request.body?.url);
@@ -116,6 +148,13 @@ app.post('/api/session', async (request, response) => {
return;
}
try {
source = await resolvePlaybackSource(url);
} catch (error) {
response.status(400).json({ error: error.message });
return;
}
const options = parsePlaybackOptions(request.body);
const id = randomUUID();
@@ -123,7 +162,10 @@ app.post('/api/session', async (request, response) => {
sessions.set(id, {
id,
url,
url: source.url,
originalUrl: url,
sourceKind: source.kind,
sourceHeaders: source.headers,
options,
duration: null,
metadataStatus,
@@ -133,7 +175,7 @@ app.post('/api/session', async (request, response) => {
lastUsedAt: Date.now(),
});
logInfo(`session created id=${shortId(id)} mode=${getSessionPlaybackConnectionMode(sessions.get(id))} fps=${options.fps} width=${options.width} quality=${options.quality}`);
logInfo(`session created id=${shortId(id)} source=${source.kind} mode=${getSessionPlaybackConnectionMode(sessions.get(id))} fps=${options.fps} width=${options.width} quality=${options.quality}`);
if (METADATA_PROBE_ENABLED) {
startSessionMetadataProbe(sessions.get(id));
@@ -222,7 +264,7 @@ app.all('/_source/:token', async (request, response) => {
response.on('close', cleanup);
try {
const headers = new Headers();
const headers = createSourceRequestHeaders(session);
const range = request.get('range');
if (range) {
@@ -374,6 +416,205 @@ function parseStreamUrl(value) {
return parsed.toString();
}
function parseResolvedMediaUrl(value) {
if (typeof value !== 'string' || value.trim().length === 0) {
throw new Error('Resolved media URL is empty.');
}
if (value.length > 32768) {
throw new Error('Resolved media URL is too long.');
}
const parsed = new URL(value.trim());
if (!['http:', 'https:'].includes(parsed.protocol)) {
throw new Error('Resolved media URL is not an HTTP(S) stream.');
}
return parsed.toString();
}
async function resolvePlaybackSource(url) {
if (!isYouTubeUrl(url)) {
return {
kind: 'direct',
url,
headers: {},
};
}
const resolved = await resolveYouTubeSource(url);
return {
kind: 'youtube',
url: resolved.url,
headers: resolved.headers,
};
}
function isYouTubeUrl(value) {
let parsed;
try {
parsed = new URL(value);
} catch {
return false;
}
const hostname = parsed.hostname.toLowerCase();
return hostname === 'youtu.be'
|| hostname.endsWith('.youtu.be')
|| hostname === 'youtube.com'
|| hostname.endsWith('.youtube.com')
|| hostname === 'youtube-nocookie.com'
|| hostname.endsWith('.youtube-nocookie.com');
}
async function resolveYouTubeSource(url) {
const startedAt = Date.now();
logInfo(`yt-dlp resolving source host=${new URL(url).hostname}`);
let payload;
try {
payload = await runYtDlpJson([
'--no-playlist',
'--no-warnings',
'--no-progress',
'--dump-single-json',
'--format',
YT_DLP_FORMAT,
url,
]);
} catch (error) {
throw new Error(`yt-dlp failed to resolve YouTube URL: ${error.message}`);
}
const mediaUrl = typeof payload?.url === 'string' ? payload.url : '';
if (!mediaUrl) {
throw new Error('yt-dlp did not return a playable media URL.');
}
const parsed = parseResolvedMediaUrl(mediaUrl);
logInfo(`yt-dlp resolved source host=${new URL(parsed).hostname} durationMs=${Date.now() - startedAt}`);
return {
url: parsed,
headers: normalizeSourceHeaders(payload?.http_headers),
};
}
function runYtDlpJson(args) {
return new Promise((resolve, reject) => {
const ytDlp = spawn(YT_DLP_PATH, args, {
stdio: ['ignore', 'pipe', 'pipe'],
});
let stdout = '';
let stdoutBytes = 0;
let stderr = '';
let timedOut = false;
let settled = false;
const finish = (callback) => {
if (settled) {
return;
}
settled = true;
clearTimeout(timer);
callback();
};
const timer = setTimeout(() => {
timedOut = true;
stopProcess(ytDlp);
}, YT_DLP_TIMEOUT_MS);
timer.unref();
ytDlp.stdout.on('data', (chunk) => {
stdoutBytes += chunk.length;
if (stdoutBytes > MAX_YT_DLP_OUTPUT_BYTES) {
finish(() => {
stopProcess(ytDlp);
reject(new Error('yt-dlp output was too large.'));
});
return;
}
stdout += chunk.toString('utf8');
});
ytDlp.stderr.on('data', (chunk) => {
stderr = appendTail(stderr, chunk);
});
ytDlp.on('error', (error) => {
finish(() => {
if (error.code === 'ENOENT') {
reject(new Error(`yt-dlp was not found at "${YT_DLP_PATH}".`));
return;
}
reject(error);
});
});
ytDlp.on('close', (code, signal) => {
finish(() => {
if (timedOut) {
reject(new Error('yt-dlp timed out.'));
return;
}
if (code !== 0) {
const detail = redactSecrets(stderr).trim();
reject(new Error(detail ? `yt-dlp exited with code ${code}: ${detail}` : `yt-dlp exited with code ${code ?? 'null'} signal ${signal ?? 'none'}.`));
return;
}
try {
resolve(JSON.parse(stdout));
} catch (error) {
reject(new Error(`yt-dlp returned invalid JSON: ${error.message}`));
}
});
});
});
}
function normalizeSourceHeaders(value) {
if (!value || typeof value !== 'object' || Array.isArray(value)) {
return {};
}
const headers = {};
for (const [name, rawValue] of Object.entries(value)) {
const normalizedName = name.toLowerCase();
if (!isAllowedSourceHeader(normalizedName) || typeof rawValue !== 'string') {
continue;
}
headers[normalizedName] = rawValue;
}
return headers;
}
function isAllowedSourceHeader(name) {
return ![
'accept-encoding',
'connection',
'content-length',
'host',
'range',
'transfer-encoding',
].includes(name);
}
function parsePlaybackOptions(body) {
return {
fps: clampInteger(body?.fps, defaults.fps, 1, 30),
@@ -1153,7 +1394,7 @@ function createRelayPlayback(session) {
sourceController = new AbortController();
try {
const headers = new Headers();
const headers = createSourceRequestHeaders(session);
headers.set('accept-encoding', 'identity');
const upstream = await fetch(session.url, {
@@ -2470,6 +2711,16 @@ function createLineLogger(callback) {
};
}
function createSourceRequestHeaders(session) {
const headers = new Headers();
for (const [name, value] of Object.entries(session.sourceHeaders ?? {})) {
headers.set(name, value);
}
return headers;
}
function copyUpstreamHeaders(upstreamHeaders, response) {
for (const header of [
'accept-ranges',
@@ -2541,7 +2792,7 @@ function summarizeFfmpegExit(code, signal, stderr) {
}
function redactSecrets(text) {
return text.replace(/([?&](?:api_key|apikey|access_token|token|key)=)[^&\s]+/gi, '$1[redacted]');
return text.replace(/([?&](?:api_key|apikey|access_token|token|key|sig|signature|lsig|n|expire|ip)=)[^&\s]+/gi, '$1[redacted]');
}
function isChildRunning(child) {