Files
carplay/server/index.js

880 lines
22 KiB
JavaScript

import { spawn } from 'node:child_process';
import { randomUUID } from 'node:crypto';
import fs from 'node:fs/promises';
import { createServer } from 'node:http';
import path from 'node:path';
import { Readable } from 'node:stream';
import { fileURLToPath } from 'node:url';
import express from 'express';
import { WebSocket, WebSocketServer } from 'ws';
const __dirname = path.dirname(fileURLToPath(import.meta.url));
const publicDir = path.join(__dirname, '..', 'public');
const app = express();
const server = createServer(app);
const wss = new WebSocketServer({ noServer: true });
const PORT = Number(process.env.PORT ?? 3000);
const FFMPEG_PATH = process.env.FFMPEG_PATH ?? 'ffmpeg';
const FFMPEG_LOG_LEVEL = process.env.FFMPEG_LOG_LEVEL ?? 'warning';
const FFMPEG_INPUT_SEEKABLE = process.env.FFMPEG_INPUT_SEEKABLE ?? '0';
const RECENT_URLS_PATH = process.env.RECENT_URLS_PATH ?? path.join(__dirname, '..', 'data', 'recent-urls.json');
const RECENT_URL_LIMIT = clampInteger(process.env.RECENT_URL_LIMIT, 12, 1, 50);
const SESSION_TTL_MS = 60 * 60 * 1000;
const PLAYBACK_READY_TIMEOUT_MS = 15 * 1000;
const MAX_WS_BUFFER_BYTES = 12 * 1024 * 1024;
const MAX_AUDIO_QUEUE_BYTES = clampInteger(process.env.MAX_AUDIO_QUEUE_BYTES, 16 * 1024 * 1024, 256 * 1024, 128 * 1024 * 1024);
const JPEG_SOI = Buffer.from([0xff, 0xd8]);
const JPEG_EOI = Buffer.from([0xff, 0xd9]);
const defaults = {
fps: 24,
width: 960,
quality: 5,
audioBitrate: '160k',
};
const sessions = new Map();
const sourceTokens = new Map();
const playbacks = new Map();
let recentUrls = [];
let recentWrite = Promise.resolve();
app.disable('x-powered-by');
app.use(express.json({ limit: '32kb' }));
app.use(express.static(publicDir));
app.get('/api/health', (_request, response) => {
response.json({ ok: true, ffmpeg: FFMPEG_PATH });
});
app.get('/api/recent-urls', (_request, response) => {
response.json({
urls: recentUrls.map((item) => ({
url: item.url,
displayUrl: redactSecrets(item.url),
lastPlayedAt: item.lastPlayedAt,
})),
});
});
app.post('/api/session', async (request, response) => {
let url;
try {
url = parseStreamUrl(request.body?.url);
} catch (error) {
response.status(400).json({ error: error.message });
return;
}
const options = parsePlaybackOptions(request.body);
const id = randomUUID();
sessions.set(id, {
id,
url,
options,
createdAt: Date.now(),
lastUsedAt: Date.now(),
});
try {
await addRecentUrl(url);
} catch (error) {
console.warn(`Failed to store recent URL: ${error.message}`);
}
response.status(201).json({ id, options });
});
app.all('/_source/:token', async (request, response) => {
const source = sourceTokens.get(request.params.token);
const session = source ? getSession(source.sessionId) : null;
const startedAt = Date.now();
const sourceLabel = source ? `${source.kind}:${shortId(source.sessionId)}` : `unknown:${request.params.token.slice(0, 8)}`;
let bytes = 0;
let upstreamEnded = false;
let upstreamStatus = 'pending';
if (!session) {
response.status(404).end();
return;
}
const controller = new AbortController();
const logClose = once(() => {
logInfo(`source closed kind=${sourceLabel} status=${upstreamStatus} bytes=${bytes} upstreamEnded=${upstreamEnded} durationMs=${Date.now() - startedAt}`);
});
const cleanup = once(() => {
controller.abort();
logClose();
});
response.on('close', cleanup);
try {
const headers = new Headers();
const range = request.get('range');
if (range) {
headers.set('range', range);
}
headers.set('accept-encoding', 'identity');
const upstream = await fetch(session.url, {
method: request.method === 'HEAD' ? 'HEAD' : 'GET',
headers,
redirect: 'follow',
signal: controller.signal,
});
upstreamStatus = String(upstream.status);
logInfo(`source connected kind=${sourceLabel} status=${upstream.status} contentType=${upstream.headers.get('content-type') ?? 'unknown'}`);
response.status(upstream.status);
copyUpstreamHeaders(upstream.headers, response);
if (request.method === 'HEAD' || !upstream.body) {
response.end();
return;
}
Readable.fromWeb(upstream.body).on('data', (chunk) => {
bytes += chunk.length;
}).on('end', () => {
upstreamEnded = true;
}).on('error', (error) => {
if (!controller.signal.aborted) {
logWarn(`source stream error kind=${sourceLabel} error=${oneLine(error.message)}`);
}
if (!response.destroyed) {
response.destroy(error);
}
}).pipe(response);
} catch (error) {
if (controller.signal.aborted) {
return;
}
logWarn(`source failed kind=${sourceLabel} error=${oneLine(error.message)}`);
if (!response.headersSent) {
response.status(502).json({ error: 'Failed to fetch source stream.' });
} else {
response.destroy(error);
}
}
});
app.get('/audio/:sessionId', (request, response) => {
const session = getSession(request.params.sessionId);
if (!session) {
response.status(404).json({ error: 'Unknown or expired session.' });
return;
}
getOrCreatePlayback(session).attachAudio(request, response);
});
server.on('upgrade', (request, socket, head) => {
const host = request.headers.host ?? 'localhost';
const { pathname } = new URL(request.url ?? '/', `http://${host}`);
const match = pathname.match(/^\/frames\/([0-9a-f-]+)$/i);
if (!match || !getSession(match[1])) {
socket.write('HTTP/1.1 404 Not Found\r\n\r\n');
socket.destroy();
return;
}
wss.handleUpgrade(request, socket, head, (websocket) => {
wss.emit('connection', websocket, request, match[1]);
});
});
wss.on('connection', (websocket, _request, sessionId) => {
const session = getSession(sessionId);
if (!session) {
websocket.close(1008, 'Unknown session');
return;
}
getOrCreatePlayback(session).attachFrames(websocket);
});
setInterval(() => {
const now = Date.now();
for (const [id, session] of sessions) {
if (now - session.lastUsedAt > SESSION_TTL_MS) {
sessions.delete(id);
}
}
for (const [token, source] of sourceTokens) {
if (now - source.createdAt > SESSION_TTL_MS) {
sourceTokens.delete(token);
}
}
}, 60 * 1000).unref();
await loadRecentUrls();
server.listen(PORT, () => {
console.log(`Frame stream app listening at http://localhost:${PORT}`);
});
function parseStreamUrl(value) {
if (typeof value !== 'string' || value.trim().length === 0) {
throw new Error('A stream URL is required.');
}
if (value.length > 4096) {
throw new Error('The stream URL is too long.');
}
const parsed = new URL(value.trim());
if (!['http:', 'https:'].includes(parsed.protocol)) {
throw new Error('Only http and https stream URLs are supported.');
}
return parsed.toString();
}
function parsePlaybackOptions(body) {
return {
fps: clampInteger(body?.fps, defaults.fps, 1, 30),
width: clampInteger(body?.width, defaults.width, 160, 1920),
quality: clampInteger(body?.quality, defaults.quality, 2, 18),
audioBitrate: parseAudioBitrate(body?.audioBitrate),
};
}
function clampInteger(value, fallback, min, max) {
const parsed = Number(value);
if (!Number.isFinite(parsed)) {
return fallback;
}
return Math.min(max, Math.max(min, Math.round(parsed)));
}
function parseAudioBitrate(value) {
if (typeof value !== 'string') {
return defaults.audioBitrate;
}
return /^\d{2,3}k$/i.test(value) ? value.toLowerCase() : defaults.audioBitrate;
}
async function loadRecentUrls() {
try {
const raw = await fs.readFile(RECENT_URLS_PATH, 'utf8');
const parsed = JSON.parse(raw);
const items = Array.isArray(parsed?.urls) ? parsed.urls : [];
recentUrls = items
.filter((item) => typeof item?.url === 'string' && typeof item?.lastPlayedAt === 'string')
.slice(0, RECENT_URL_LIMIT);
} catch (error) {
if (error.code !== 'ENOENT') {
console.warn(`Failed to read recent URLs: ${error.message}`);
}
recentUrls = [];
}
}
async function addRecentUrl(url) {
const lastPlayedAt = new Date().toISOString();
recentUrls = [
{ url, lastPlayedAt },
...recentUrls.filter((item) => item.url !== url),
].slice(0, RECENT_URL_LIMIT);
recentWrite = recentWrite.catch(() => {}).then(() => saveRecentUrls());
await recentWrite;
}
async function saveRecentUrls() {
const directory = path.dirname(RECENT_URLS_PATH);
const temporaryPath = `${RECENT_URLS_PATH}.${process.pid}.tmp`;
const payload = `${JSON.stringify({ urls: recentUrls }, null, 2)}\n`;
await fs.mkdir(directory, { recursive: true });
await fs.writeFile(temporaryPath, payload, 'utf8');
await fs.rename(temporaryPath, RECENT_URLS_PATH);
}
function getSession(sessionId) {
const session = sessions.get(sessionId);
if (!session) {
return null;
}
session.lastUsedAt = Date.now();
return session;
}
function getOrCreatePlayback(session) {
const existing = playbacks.get(session.id);
if (existing && !existing.closed) {
return existing;
}
const playback = createPlayback(session);
playbacks.set(session.id, playback);
return playback;
}
function createPlayback(session) {
const { fps, quality, width } = session.options;
const label = `playback:${shortId(session.id)}`;
let audioResponse = null;
let audioQueue = [];
let audioQueueBytes = 0;
let audioQueuePeakBytes = 0;
let audioBackpressureCount = 0;
let audioDrainCount = 0;
let audioDrainPending = false;
let websocket = null;
let ffmpeg = null;
let logger = null;
let releaseSource = () => {};
let stderr = '';
let frameBuffer = Buffer.alloc(0);
let frameIndex = 0;
let skippedFrames = 0;
let stopReason = 'process_exit';
let started = false;
let closed = false;
let readyTimer = null;
const stderrTailLogger = createLineLogger((line) => {
if (!shouldSuppressFfmpegLogLine(line)) {
stderr = appendTail(stderr, `${line}\n`);
}
});
const playback = {
get closed() {
return closed;
},
attachAudio(request, response) {
if (closed) {
response.status(410).end();
return;
}
if (audioResponse && !audioResponse.writableEnded) {
response.status(409).json({ error: 'Audio stream already attached for this session.' });
return;
}
audioResponse = response;
response.set({
'Cache-Control': 'no-store',
'Content-Type': 'audio/mpeg',
'X-Content-Type-Options': 'nosniff',
});
response.flushHeaders();
const cleanup = once(() => {
if (!closed) {
stop('client_disconnect');
}
});
request.on('close', cleanup);
response.on('close', cleanup);
maybeStart();
},
attachFrames(nextWebsocket) {
if (closed) {
nextWebsocket.close(1011, 'Playback closed');
return;
}
if (websocket && websocket.readyState === WebSocket.OPEN) {
nextWebsocket.close(1013, 'Frame stream already attached for this session');
return;
}
websocket = nextWebsocket;
sendJson(websocket, {
type: 'ready',
codec: 'jpeg',
fps,
quality,
width,
});
websocket.on('close', () => {
if (!closed) {
stop('client_disconnect');
}
});
websocket.on('error', () => {
if (!closed) {
stop('websocket_error');
}
});
maybeStart();
},
};
readyTimer = setTimeout(() => {
if (!started && !closed) {
stop('consumer_attach_timeout');
}
}, PLAYBACK_READY_TIMEOUT_MS).unref();
return playback;
function maybeStart() {
if (started || closed || !audioResponse || !isWebSocketOpen(websocket)) {
return;
}
started = true;
clearReadyTimer();
const source = createSourceInput(session.id, 'playback');
releaseSource = once(source.release);
ffmpeg = spawnFfmpeg(buildPlaybackArgs(session, source.url), ['ignore', 'pipe', 'pipe', 'pipe']);
logger = createFfmpegLogger('playback', session.id, ffmpeg.pid);
logger.start();
const frameStream = ffmpeg.stdio[3];
ffmpeg.stdout.on('data', handleAudioData);
ffmpeg.stdout.on('error', (error) => {
logWarn(`audio output error kind=${label} error=${oneLine(error.message)}`);
stop('audio_output_error');
});
frameStream.on('error', (error) => {
logWarn(`frame output error kind=${label} error=${oneLine(error.message)}`);
stop('frame_output_error');
});
frameStream.on('data', handleFrameData);
ffmpeg.stderr.on('data', (chunk) => {
stderrTailLogger.write(chunk);
logger.stderr(chunk);
});
ffmpeg.on('error', (error) => {
stopReason = 'start_error';
logger.error(error);
finishPlayback(1011, `Failed to start ffmpeg: ${error.message}`, null, null);
});
ffmpeg.on('close', (code, signal) => {
stderrTailLogger.flush();
logger.close(code, signal, stopReason, stderr);
finishPlayback(1000, 'ffmpeg exited', code, signal);
});
}
function handleAudioData(chunk) {
if (closed) {
return;
}
if (!audioResponse || audioResponse.writableEnded) {
stop('audio_response_closed');
return;
}
audioQueue.push(chunk);
audioQueueBytes += chunk.length;
audioQueuePeakBytes = Math.max(audioQueuePeakBytes, audioQueueBytes);
if (audioQueueBytes > MAX_AUDIO_QUEUE_BYTES) {
logWarn(`audio queue overflow kind=${label} bytes=${audioQueueBytes} maxBytes=${MAX_AUDIO_QUEUE_BYTES}`);
stop('audio_queue_overflow');
return;
}
flushAudioQueue();
}
function flushAudioQueue() {
if (closed || !audioResponse || audioResponse.writableEnded) {
return;
}
while (audioQueue.length > 0) {
const chunk = audioQueue.shift();
audioQueueBytes -= chunk.length;
let canContinue = false;
try {
canContinue = audioResponse.write(chunk);
} catch (error) {
logWarn(`audio response write failed kind=${label} error=${oneLine(error.message)}`);
stop('audio_response_write_error');
return;
}
if (!canContinue) {
audioBackpressureCount += 1;
waitForAudioDrain();
return;
}
}
}
function waitForAudioDrain() {
if (audioDrainPending || !audioResponse || audioResponse.writableEnded) {
return;
}
audioDrainPending = true;
audioResponse.once('drain', () => {
audioDrainPending = false;
audioDrainCount += 1;
flushAudioQueue();
});
}
function handleFrameData(chunk) {
frameBuffer = Buffer.concat([frameBuffer, chunk]);
for (;;) {
const start = frameBuffer.indexOf(JPEG_SOI);
if (start === -1) {
frameBuffer = Buffer.alloc(0);
return;
}
const end = frameBuffer.indexOf(JPEG_EOI, start + JPEG_SOI.length);
if (end === -1) {
frameBuffer = start === 0 ? frameBuffer : frameBuffer.subarray(start);
return;
}
const jpeg = frameBuffer.subarray(start, end + JPEG_EOI.length);
frameBuffer = frameBuffer.subarray(end + JPEG_EOI.length);
const timestamp = frameIndex / fps;
frameIndex += 1;
if (!isWebSocketOpen(websocket)) {
return;
}
if (websocket.bufferedAmount > MAX_WS_BUFFER_BYTES) {
skippedFrames += 1;
continue;
}
const packet = Buffer.allocUnsafe(8 + jpeg.length);
packet.writeDoubleLE(timestamp, 0);
jpeg.copy(packet, 8);
websocket.send(packet, { binary: true }, (error) => {
if (error && !closed) {
stop('websocket_send_error');
}
});
}
}
function stop(reason) {
stopReason = reason;
if (ffmpeg && ffmpeg.exitCode === null && ffmpeg.signalCode === null) {
stopProcess(ffmpeg);
return;
}
finishPlayback(1000, 'playback stopped', null, null);
}
function finishPlayback(websocketCode, websocketReason, ffmpegCode, ffmpegSignal) {
if (closed) {
return;
}
closed = true;
clearReadyTimer();
releaseSource();
playbacks.delete(session.id);
audioQueue = [];
audioQueueBytes = 0;
if (audioResponse && !audioResponse.writableEnded) {
audioResponse.end();
}
if (isWebSocketOpen(websocket)) {
sendJson(websocket, {
type: 'end',
code: ffmpegCode,
signal: ffmpegSignal,
skippedFrames,
message: summarizeFfmpegExit(ffmpegCode, ffmpegSignal, stderr),
});
websocket.close(websocketCode, websocketReason);
}
logInfo(`playback closed kind=${label} reason=${stopReason} frames=${frameIndex} skippedFrames=${skippedFrames} audioQueuePeakBytes=${audioQueuePeakBytes} audioBackpressureCount=${audioBackpressureCount} audioDrainCount=${audioDrainCount}`);
}
function clearReadyTimer() {
if (readyTimer) {
clearTimeout(readyTimer);
readyTimer = null;
}
}
}
function isWebSocketOpen(websocket) {
return websocket?.readyState === WebSocket.OPEN;
}
function createSourceInput(sessionId, kind) {
const token = randomUUID();
sourceTokens.set(token, { sessionId, kind, createdAt: Date.now() });
return {
url: `http://127.0.0.1:${getListeningPort()}/_source/${token}`,
release: () => sourceTokens.delete(token),
};
}
function getListeningPort() {
const address = server.address();
return typeof address === 'object' && address ? address.port : PORT;
}
function buildPlaybackArgs(session, inputUrl) {
const { fps, quality, width } = session.options;
const videoFilter = `fps=${fps},scale=w='min(${width},iw)':h=-2:flags=bicubic:out_range=pc,format=yuvj420p`;
return [
'-hide_banner',
'-nostdin',
'-loglevel',
FFMPEG_LOG_LEVEL,
'-nostats',
'-seekable',
FFMPEG_INPUT_SEEKABLE,
'-re',
'-i',
inputUrl,
'-map',
'0:a:0?',
'-vn',
'-ac',
'2',
'-ar',
'48000',
'-codec:a',
'libmp3lame',
'-b:a',
session.options.audioBitrate,
'-f',
'mp3',
'pipe:1',
'-map',
'0:v:0',
'-an',
'-vf',
videoFilter,
'-codec:v',
'mjpeg',
'-pix_fmt',
'yuvj420p',
'-color_range',
'pc',
'-q:v',
String(quality),
'-f',
'image2pipe',
'pipe:3',
];
}
function spawnFfmpeg(args, stdio = ['ignore', 'pipe', 'pipe']) {
return spawn(FFMPEG_PATH, args, {
stdio,
});
}
function createFfmpegLogger(kind, sessionId, pid) {
const startedAt = Date.now();
const label = `${kind}:${shortId(sessionId)}`;
const lineLogger = createLineLogger((line) => {
if (shouldSuppressFfmpegLogLine(line)) {
return;
}
logWarn(`ffmpeg stderr kind=${label} pid=${pid ?? 'unknown'} line="${oneLine(redactSecrets(line))}"`);
});
return {
start() {
logInfo(`ffmpeg started kind=${label} pid=${pid ?? 'unknown'} loglevel=${FFMPEG_LOG_LEVEL}`);
},
stderr(chunk) {
lineLogger.write(chunk);
},
error(error) {
logWarn(`ffmpeg start error kind=${label} pid=${pid ?? 'unknown'} error=${oneLine(error.message)}`);
},
close(code, signal, reason, stderr) {
lineLogger.flush();
const level = reason === 'client_disconnect' || code === 0 || signal === 'SIGTERM' ? 'info' : 'warn';
const tail = redactSecrets(stderr).trim();
const tailText = tail ? ` stderrTail="${oneLine(tail)}"` : '';
log(level, `ffmpeg exited kind=${label} pid=${pid ?? 'unknown'} code=${code ?? 'null'} signal=${signal ?? 'none'} reason=${reason} durationMs=${Date.now() - startedAt}${tailText}`);
},
};
}
function shouldSuppressFfmpegLogLine(line) {
return line.includes('deprecated pixel format used, make sure you did set range correctly');
}
function createLineLogger(callback) {
let buffer = '';
return {
write(chunk) {
buffer += chunk.toString('utf8');
const lines = buffer.split(/\r?\n/);
buffer = lines.pop() ?? '';
for (const line of lines) {
if (line.trim()) {
callback(line);
}
}
},
flush() {
if (buffer.trim()) {
callback(buffer);
}
buffer = '';
},
};
}
function copyUpstreamHeaders(upstreamHeaders, response) {
for (const header of [
'accept-ranges',
'content-length',
'content-range',
'content-type',
'etag',
'last-modified',
]) {
const value = upstreamHeaders.get(header);
if (value) {
response.setHeader(header, value);
}
}
response.setHeader('cache-control', 'no-store');
}
function sendJson(websocket, payload) {
if (websocket.readyState === WebSocket.OPEN) {
websocket.send(JSON.stringify(payload));
}
}
function appendTail(current, chunk) {
const next = current + chunk.toString('utf8');
return next.length > 4000 ? next.slice(-4000) : next;
}
function shortId(id) {
return id.slice(0, 8);
}
function oneLine(value) {
return String(value).replace(/\s+/g, ' ').trim().slice(0, 1200);
}
function logInfo(message) {
log('info', message);
}
function logWarn(message) {
log('warn', message);
}
function log(level, message) {
const output = `${new Date().toISOString()} ${message}`;
if (level === 'warn') {
console.warn(output);
return;
}
console.log(output);
}
function summarizeFfmpegExit(code, signal, stderr) {
if (signal) {
return `ffmpeg stopped by ${signal}.`;
}
if (code === 0 || code === null) {
return 'Frame stream ended.';
}
const detail = redactSecrets(stderr).trim();
return detail ? `ffmpeg exited with code ${code}: ${detail}` : `ffmpeg exited with code ${code}.`;
}
function redactSecrets(text) {
return text.replace(/([?&](?:api_key|apikey|access_token|token|key)=)[^&\s]+/gi, '$1[redacted]');
}
function stopProcess(child) {
if (!child || child.exitCode !== null || child.signalCode !== null) {
return;
}
child.kill('SIGTERM');
setTimeout(() => {
if (child.exitCode === null && child.signalCode === null) {
child.kill('SIGKILL');
}
}, 1500).unref();
}
function once(callback) {
let called = false;
return (...args) => {
if (called) {
return;
}
called = true;
callback(...args);
};
}