sequential frames support

- Server now has configurable MAX_WS_BUFFER_BYTES defaulting to 2097152, and skips JPEG frames
    when the WebSocket is backed up instead of queueing stale frames in ws (server/index.js:30,
    server/index.js:1439).
  - Browser frame handling now decodes frames sequentially, drops late frames against the audio
    clock, caps pending/decoded queues, and draws only the latest due frame per animation tick
    (public/app.js:280, public/app.js:381).
  - Relay/split normal EOF closes are no longer mislabeled as client_disconnect, which should
    make logs around ffmpeg decode warnings less misleading (server/index.js:797, server/
    index.js:1071).
  - Documented MAX_WS_BUFFER_BYTES in README, Compose, and AGENTS.
This commit is contained in:
2026-05-04 00:00:34 -07:00
parent a3429dee85
commit 13b1d768dc
5 changed files with 218 additions and 52 deletions

View File

@@ -27,7 +27,7 @@ const RECENT_URL_LIMIT = clampInteger(process.env.RECENT_URL_LIMIT, 12, 1, 50);
const SESSION_TTL_MS = 60 * 60 * 1000;
const METADATA_PROBE_TIMEOUT_MS = 8 * 1000;
const PLAYBACK_READY_TIMEOUT_MS = 15 * 1000;
const MAX_WS_BUFFER_BYTES = 12 * 1024 * 1024;
const MAX_WS_BUFFER_BYTES = clampInteger(process.env.MAX_WS_BUFFER_BYTES, 2 * 1024 * 1024, 128 * 1024, 64 * 1024 * 1024);
const MAX_AUDIO_QUEUE_BYTES = clampInteger(process.env.MAX_AUDIO_QUEUE_BYTES, 16 * 1024 * 1024, 256 * 1024, 128 * 1024 * 1024);
const MAX_RELAY_BRANCH_QUEUE_BYTES = clampInteger(process.env.MAX_RELAY_BRANCH_QUEUE_BYTES, 16 * 1024 * 1024, 512 * 1024, 256 * 1024 * 1024);
const RELAY_BRANCH_PAUSE_BYTES = Math.floor(MAX_RELAY_BRANCH_QUEUE_BYTES / 2);
@@ -551,6 +551,7 @@ function streamSplitAudio(request, response, session) {
const ffmpeg = spawnFfmpeg(worker.args);
const logger = createFfmpegLogger('audio', session.id, ffmpeg.pid);
const stderrTail = createFfmpegStderrTail();
let responseFinished = false;
let stopReason = 'process_exit';
logger.start();
@@ -592,7 +593,15 @@ function streamSplitAudio(request, response, session) {
}
});
response.on('finish', () => {
responseFinished = true;
});
const cleanup = once(() => {
if (responseFinished || response.writableEnded) {
return;
}
stopReason = 'client_disconnect';
stopProcess(ffmpeg);
});
@@ -612,6 +621,7 @@ function streamSplitFrames(websocket, session) {
let frameIndex = 0;
let skippedFrames = 0;
let stopReason = 'process_exit';
let serverClosingWebSocket = false;
logger.start();
@@ -662,6 +672,7 @@ function streamSplitFrames(websocket, session) {
skippedFrames,
message: summarizeFfmpegExit(code, signal, stderrTail.value),
});
serverClosingWebSocket = true;
websocket.close(1000, 'ffmpeg exited');
}
@@ -669,6 +680,10 @@ function streamSplitFrames(websocket, session) {
});
websocket.on('close', () => {
if (serverClosingWebSocket) {
return;
}
stopReason = 'client_disconnect';
stopProcess(ffmpeg);
});
@@ -705,20 +720,18 @@ function streamSplitFrames(websocket, session) {
return;
}
if (websocket.bufferedAmount > MAX_WS_BUFFER_BYTES) {
skippedFrames += 1;
continue;
const sendResult = sendFramePacket(websocket, timestamp, jpeg, () => {
stopReason = 'websocket_send_error';
stopProcess(ffmpeg);
});
if (sendResult === 'closed') {
return;
}
const packet = Buffer.allocUnsafe(8 + jpeg.length);
packet.writeDoubleLE(timestamp, 0);
jpeg.copy(packet, 8);
websocket.send(packet, { binary: true }, (error) => {
if (error) {
stopReason = 'websocket_send_error';
stopProcess(ffmpeg);
}
});
if (sendResult === 'skipped') {
skippedFrames += 1;
}
}
}
}
@@ -749,6 +762,8 @@ function createRelayPlayback(session) {
let frameExitCode = null;
let frameExitSignal = null;
let frameEndSent = false;
let audioResponseFinished = false;
let serverClosingWebSocket = false;
let readyTimer = null;
const audioStderr = createFfmpegStderrTail();
const frameStderr = createFfmpegStderrTail();
@@ -779,8 +794,12 @@ function createRelayPlayback(session) {
});
response.flushHeaders();
response.on('finish', () => {
audioResponseFinished = true;
});
const cleanup = once(() => {
if (!closed) {
if (!closed && !audioResponseFinished && !response.writableEnded) {
stop('client_disconnect');
}
});
@@ -814,7 +833,7 @@ function createRelayPlayback(session) {
});
websocket.on('close', () => {
if (!closed) {
if (!closed && !serverClosingWebSocket) {
stop('client_disconnect');
}
});
@@ -994,19 +1013,19 @@ function createRelayPlayback(session) {
return;
}
if (websocket.bufferedAmount > MAX_WS_BUFFER_BYTES) {
skippedFrames += 1;
continue;
}
const packet = Buffer.allocUnsafe(8 + jpeg.length);
packet.writeDoubleLE(timestamp, 0);
jpeg.copy(packet, 8);
websocket.send(packet, { binary: true }, (error) => {
const sendResult = sendFramePacket(websocket, timestamp, jpeg, (error) => {
if (error && !closed) {
stop('websocket_send_error');
}
});
if (sendResult === 'closed') {
return;
}
if (sendResult === 'skipped') {
skippedFrames += 1;
}
}
}
@@ -1062,6 +1081,7 @@ function createRelayPlayback(session) {
skippedFrames,
message: summarizeFfmpegExit(frameExitCode, frameExitSignal, frameStderr.value),
});
serverClosingWebSocket = true;
websocket.close(1000, 'ffmpeg exited');
}
@@ -1084,6 +1104,7 @@ function createRelayPlayback(session) {
}
if (isWebSocketOpen(websocket) && frameEndSent) {
serverClosingWebSocket = true;
websocket.close(websocketCode, websocketReason);
} else {
sendFrameEndIfNeeded();
@@ -1344,19 +1365,19 @@ function createPlayback(session) {
return;
}
if (websocket.bufferedAmount > MAX_WS_BUFFER_BYTES) {
skippedFrames += 1;
continue;
}
const packet = Buffer.allocUnsafe(8 + jpeg.length);
packet.writeDoubleLE(timestamp, 0);
jpeg.copy(packet, 8);
websocket.send(packet, { binary: true }, (error) => {
const sendResult = sendFramePacket(websocket, timestamp, jpeg, (error) => {
if (error && !closed) {
stop('websocket_send_error');
}
});
if (sendResult === 'closed') {
return;
}
if (sendResult === 'skipped') {
skippedFrames += 1;
}
}
}
@@ -1415,6 +1436,28 @@ function isWebSocketOpen(websocket) {
return websocket?.readyState === WebSocket.OPEN;
}
function sendFramePacket(websocket, timestamp, jpeg, onError) {
if (!isWebSocketOpen(websocket)) {
return 'closed';
}
const packetBytes = 8 + jpeg.length;
if (websocket.bufferedAmount > 0 && websocket.bufferedAmount + packetBytes > MAX_WS_BUFFER_BYTES) {
return 'skipped';
}
const packet = Buffer.allocUnsafe(packetBytes);
packet.writeDoubleLE(timestamp, 0);
jpeg.copy(packet, 8);
websocket.send(packet, { binary: true }, (error) => {
if (error) {
onError(error);
}
});
return 'sent';
}
function createSourceInput(sessionId, kind) {
const token = randomUUID();
sourceTokens.set(token, { sessionId, kind, createdAt: Date.now() });