Improve frame sync telemetry

This commit is contained in:
2026-06-11 10:06:56 -07:00
parent 7655d7aaba
commit de0307539c
2 changed files with 425 additions and 13 deletions

View File

@@ -34,6 +34,8 @@ const FAVORITES_PATH = process.env.FAVORITES_PATH ?? path.join(__dirname, '..',
const FAVORITES_LIMIT = clampInteger(process.env.FAVORITES_LIMIT, 50, 1, 200);
const SESSION_TTL_MS = 60 * 60 * 1000;
const PLAYBACK_READY_TIMEOUT_MS = 15 * 1000;
const CLIENT_CLOCK_FRAME_LATE_GRACE_SECONDS = 0.25;
const FRAME_CONDITION_LOG_INTERVAL_MS = 5000;
const MAX_WS_BUFFER_BYTES = clampInteger(process.env.MAX_WS_BUFFER_BYTES, 2 * 1024 * 1024, 128 * 1024, 64 * 1024 * 1024);
const MAX_AUDIO_QUEUE_BYTES = clampInteger(process.env.MAX_AUDIO_QUEUE_BYTES, 4 * 1024 * 1024, 256 * 1024, 128 * 1024 * 1024);
const MAX_RELAY_BRANCH_QUEUE_BYTES = clampInteger(process.env.MAX_RELAY_BRANCH_QUEUE_BYTES, 8 * 1024 * 1024, 512 * 1024, 256 * 1024 * 1024);
@@ -818,6 +820,8 @@ function streamSplitFrames(websocket, session) {
let skippedFrames = 0;
let stopReason = 'process_exit';
let serverClosingWebSocket = false;
let clientAudioTime = null;
const frameConditions = createFrameConditionLogger(label);
const frameSender = createLatestFrameSender(websocket, {
onError: () => {
stopReason = 'websocket_send_error';
@@ -826,6 +830,9 @@ function streamSplitFrames(websocket, session) {
onSkip: () => {
skippedFrames += 1;
},
onQueue: (event) => {
frameConditions.senderQueue(event);
},
});
const frameParser = createJpegFrameParser(handleJpegFrame);
@@ -883,6 +890,7 @@ function streamSplitFrames(websocket, session) {
}
logInfo(`frames closed kind=${label} reason=${stopReason} frames=${frameIndex} skippedFrames=${skippedFrames}`);
frameConditions.flush('close', true);
});
websocket.on('close', () => {
@@ -899,6 +907,10 @@ function streamSplitFrames(websocket, session) {
stopProcess(ffmpeg);
});
websocket.on('message', (data, isBinary) => {
clientAudioTime = handleFrameClientMessage(data, isBinary, label, clientAudioTime);
});
function handleFrameData(chunk) {
frameParser.write(chunk);
}
@@ -907,6 +919,12 @@ function streamSplitFrames(websocket, session) {
const timestamp = frameIndex / fps;
frameIndex += 1;
if (isFrameLateForClient(timestamp, clientAudioTime)) {
skippedFrames += 1;
frameConditions.clientClockSkip(timestamp, clientAudioTime);
return true;
}
const sendResult = frameSender.send(timestamp, jpeg);
if (sendResult === 'closed') {
@@ -943,9 +961,11 @@ function createRelayPlayback(session) {
let frameExitSignal = null;
let frameEndSent = false;
let frameSender = null;
let clientAudioTime = null;
let audioResponseFinished = false;
let serverClosingWebSocket = false;
let readyTimer = null;
const frameConditions = createFrameConditionLogger(label);
const audioStderr = createFfmpegStderrTail();
const frameStderr = createFfmpegStderrTail();
const frameParser = createJpegFrameParser(handleJpegFrame);
@@ -1012,6 +1032,9 @@ function createRelayPlayback(session) {
onSkip: () => {
skippedFrames += 1;
},
onQueue: (event) => {
frameConditions.senderQueue(event);
},
});
sendJson(websocket, {
@@ -1037,6 +1060,10 @@ function createRelayPlayback(session) {
}
});
websocket.on('message', (data, isBinary) => {
clientAudioTime = handleFrameClientMessage(data, isBinary, label, clientAudioTime);
});
maybeStart();
},
};
@@ -1187,6 +1214,12 @@ function createRelayPlayback(session) {
const timestamp = frameIndex / fps;
frameIndex += 1;
if (isFrameLateForClient(timestamp, clientAudioTime)) {
skippedFrames += 1;
frameConditions.clientClockSkip(timestamp, clientAudioTime);
return true;
}
const sendResult = frameSender?.send(timestamp, jpeg) ?? 'closed';
if (sendResult === 'closed') {
@@ -1278,6 +1311,7 @@ function createRelayPlayback(session) {
}
logInfo(`relay closed kind=${label} reason=${stopReason} sourceBytes=${sourceBytes} frames=${frameIndex} skippedFrames=${skippedFrames} audioInputPeakBytes=${audioInput?.peakBytes ?? 0} frameInputPeakBytes=${frameInput?.peakBytes ?? 0}`);
frameConditions.flush('close', true);
}
function clearReadyTimer() {
@@ -1310,6 +1344,8 @@ function createPlayback(session) {
let closed = false;
let readyTimer = null;
let frameSender = null;
let clientAudioTime = null;
const frameConditions = createFrameConditionLogger(label);
const stderrTail = createFfmpegStderrTail();
const frameParser = createJpegFrameParser(handleJpegFrame);
@@ -1371,6 +1407,9 @@ function createPlayback(session) {
onSkip: () => {
skippedFrames += 1;
},
onQueue: (event) => {
frameConditions.senderQueue(event);
},
});
sendJson(websocket, {
@@ -1396,6 +1435,10 @@ function createPlayback(session) {
}
});
websocket.on('message', (data, isBinary) => {
clientAudioTime = handleFrameClientMessage(data, isBinary, label, clientAudioTime);
});
maybeStart();
},
};
@@ -1525,6 +1568,12 @@ function createPlayback(session) {
const timestamp = frameIndex / fps;
frameIndex += 1;
if (isFrameLateForClient(timestamp, clientAudioTime)) {
skippedFrames += 1;
frameConditions.clientClockSkip(timestamp, clientAudioTime);
return true;
}
const sendResult = frameSender?.send(timestamp, jpeg) ?? 'closed';
if (sendResult === 'closed') {
@@ -1575,6 +1624,7 @@ function createPlayback(session) {
}
logInfo(`playback closed kind=${label} reason=${stopReason} frames=${frameIndex} skippedFrames=${skippedFrames} audioQueuePeakBytes=${audioQueuePeakBytes} audioBackpressureCount=${audioBackpressureCount} audioDrainCount=${audioDrainCount}`);
frameConditions.flush('close', true);
}
function clearReadyTimer() {
@@ -1678,7 +1728,7 @@ function createJpegFrameParser(onFrame) {
}
}
function createLatestFrameSender(websocket, { onError = () => {}, onSkip = () => {} } = {}) {
function createLatestFrameSender(websocket, { onError = () => {}, onSkip = () => {}, onQueue = () => {} } = {}) {
let sending = false;
let latestFrame = null;
let pumpTimer = null;
@@ -1691,8 +1741,21 @@ function createLatestFrameSender(websocket, { onError = () => {}, onSkip = () =>
const packetBytes = 8 + getJpegFrameByteLength(jpeg);
if (sending || isWebSocketOverFrameBudget(websocket, packetBytes)) {
queueLatestFrame(timestamp, jpeg);
if (sending) {
queueLatestFrame(timestamp, jpeg, {
reason: 'send_in_progress',
packetBytes,
bufferedAmount: websocket.bufferedAmount,
});
return 'queued';
}
if (isWebSocketOverFrameBudget(websocket, packetBytes)) {
queueLatestFrame(timestamp, jpeg, {
reason: 'websocket_buffer_budget',
packetBytes,
bufferedAmount: websocket.bufferedAmount,
});
return 'queued';
}
@@ -1700,12 +1763,15 @@ function createLatestFrameSender(websocket, { onError = () => {}, onSkip = () =>
},
};
function queueLatestFrame(timestamp, jpeg) {
if (latestFrame) {
function queueLatestFrame(timestamp, jpeg, event) {
const replacing = Boolean(latestFrame);
if (replacing) {
onSkip();
}
latestFrame = { timestamp, jpeg };
onQueue({ ...event, replacing });
schedulePump();
}
@@ -1774,6 +1840,198 @@ function createLatestFrameSender(websocket, { onError = () => {}, onSkip = () =>
}
}
function handleFrameClientMessage(data, isBinary, label, currentClientAudioTime) {
const message = parseFrameClientMessage(data, isBinary);
if (!message) {
return currentClientAudioTime;
}
if (message.type === 'clock') {
return message.currentTime;
}
if (message.type === 'telemetry') {
logClientTelemetry(label, message);
}
return currentClientAudioTime;
}
function parseFrameClientMessage(data, isBinary) {
if (isBinary) {
return null;
}
let message;
try {
const raw = typeof data === 'string' ? data : data.toString('utf8');
message = JSON.parse(raw);
} catch {
return null;
}
if (message?.type === 'clock') {
const currentTime = Number(message.currentTime);
if (!Number.isFinite(currentTime)) {
return null;
}
return {
type: 'clock',
currentTime: clampNumber(currentTime, 0, Number.MAX_SAFE_INTEGER),
};
}
if (message?.type === 'telemetry') {
return {
type: 'telemetry',
reason: sanitizeTelemetryText(message.reason, 'periodic'),
currentTime: finiteTelemetryNumber(message.currentTime),
paused: Boolean(message.paused),
readyState: finiteTelemetryNumber(message.readyState),
pendingFrames: finiteTelemetryNumber(message.pendingFrames),
decodedFrames: finiteTelemetryNumber(message.decodedFrames),
paintedFrames: finiteTelemetryNumber(message.paintedFrames),
lastFramePacketAgeMs: finiteTelemetryNumber(message.lastFramePacketAgeMs),
lastFramePaintAgeMs: finiteTelemetryNumber(message.lastFramePaintAgeMs),
hidden: Boolean(message.hidden),
online: Boolean(message.online),
counters: sanitizeTelemetryMap(message.counters),
max: sanitizeTelemetryMap(message.max),
};
}
return null;
}
function isFrameLateForClient(timestamp, clientAudioTime) {
return Number.isFinite(clientAudioTime) && timestamp < clientAudioTime - CLIENT_CLOCK_FRAME_LATE_GRACE_SECONDS;
}
function createFrameConditionLogger(label) {
let counters = {};
let max = {};
let lastLoggedAt = 0;
return {
senderQueue(event) {
incrementCounter(`senderQueue_${event.reason}`);
if (event.replacing) {
incrementCounter('senderReplacedLatest');
}
recordMax('wsBufferedBytes', event.bufferedAmount);
recordMax('framePacketBytes', event.packetBytes);
flush('periodic');
},
clientClockSkip(timestamp, clientAudioTime) {
incrementCounter('clientClockSkips');
recordMax('clientClockLagMs', (clientAudioTime - timestamp) * 1000);
flush('periodic');
},
flush,
};
function incrementCounter(name, count = 1) {
counters[name] = (counters[name] ?? 0) + count;
}
function recordMax(name, value) {
if (!Number.isFinite(value)) {
return;
}
max[name] = Math.max(max[name] ?? 0, Math.round(value));
}
function flush(reason = 'periodic', force = false) {
const now = Date.now();
if (!force && now - lastLoggedAt < FRAME_CONDITION_LOG_INTERVAL_MS) {
return;
}
if (Object.keys(counters).length === 0 && Object.keys(max).length === 0) {
return;
}
logInfo(`frame conditions kind=${label} reason=${reason} counters=${formatTelemetryMap(counters)} max=${formatTelemetryMap(max)}`);
counters = {};
max = {};
lastLoggedAt = now;
}
}
function logClientTelemetry(label, telemetry) {
logInfo(
`client telemetry kind=${label}` +
` reason=${telemetry.reason}` +
` currentTime=${formatTelemetryNumber(telemetry.currentTime)}` +
` paused=${telemetry.paused}` +
` readyState=${formatTelemetryNumber(telemetry.readyState)}` +
` pendingFrames=${formatTelemetryNumber(telemetry.pendingFrames)}` +
` decodedFrames=${formatTelemetryNumber(telemetry.decodedFrames)}` +
` paintedFrames=${formatTelemetryNumber(telemetry.paintedFrames)}` +
` lastFramePacketAgeMs=${formatTelemetryNumber(telemetry.lastFramePacketAgeMs)}` +
` lastFramePaintAgeMs=${formatTelemetryNumber(telemetry.lastFramePaintAgeMs)}` +
` hidden=${telemetry.hidden}` +
` online=${telemetry.online}` +
` counters=${formatTelemetryMap(telemetry.counters)}` +
` max=${formatTelemetryMap(telemetry.max)}`,
);
}
function sanitizeTelemetryMap(value) {
if (!value || typeof value !== 'object' || Array.isArray(value)) {
return {};
}
const sanitized = {};
for (const [key, rawValue] of Object.entries(value).slice(0, 40)) {
const name = sanitizeTelemetryText(key, '').replace(/[^a-zA-Z0-9_.-]/g, '_').slice(0, 80);
const number = finiteTelemetryNumber(rawValue);
if (name && number !== null) {
sanitized[name] = number;
}
}
return sanitized;
}
function sanitizeTelemetryText(value, fallback) {
const text = typeof value === 'string' ? value : fallback;
return oneLine(text).replace(/"/g, '').slice(0, 120);
}
function finiteTelemetryNumber(value) {
const number = Number(value);
return Number.isFinite(number) ? number : null;
}
function formatTelemetryMap(value) {
const entries = Object.entries(value);
if (entries.length === 0) {
return 'none';
}
return entries.map(([key, entryValue]) => `${key}=${formatTelemetryNumber(entryValue)}`).join(',');
}
function formatTelemetryNumber(value) {
if (!Number.isFinite(value)) {
return 'null';
}
return Math.round(value * 1000) / 1000;
}
function isWebSocketOverFrameBudget(websocket, packetBytes) {
return websocket.bufferedAmount > 0 && websocket.bufferedAmount + packetBytes > MAX_WS_BUFFER_BYTES;
}