From 7655d7aabaf1074842bba5e550ddd0eeea478e6d Mon Sep 17 00:00:00 2001 From: James Magahern Date: Thu, 11 Jun 2026 09:47:17 -0700 Subject: [PATCH] Improve frame dropping under backpressure --- public/app.js | 4 +- server/index.js | 170 ++++++++++++++++++++++++++++++++++++------------ 2 files changed, 130 insertions(+), 44 deletions(-) diff --git a/public/app.js b/public/app.js index f2911b5..22870e0 100644 --- a/public/app.js +++ b/public/app.js @@ -657,7 +657,7 @@ function trimPendingFrameQueue() { const overflow = state.pendingFrames.length - maxQueuedFrames; if (overflow > 0) { - state.pendingFrames.splice(maxQueuedFrames, overflow); + state.pendingFrames.splice(0, overflow); } } @@ -671,7 +671,7 @@ function trimFrameQueue() { return; } - const removed = state.frames.splice(maxQueuedFrames, overflow); + const removed = state.frames.splice(0, overflow); for (const frame of removed) { releaseImage(frame.bitmap); diff --git a/server/index.js b/server/index.js index 25c2579..a1c504a 100644 --- a/server/index.js +++ b/server/index.js @@ -818,6 +818,15 @@ function streamSplitFrames(websocket, session) { let skippedFrames = 0; let stopReason = 'process_exit'; let serverClosingWebSocket = false; + const frameSender = createLatestFrameSender(websocket, { + onError: () => { + stopReason = 'websocket_send_error'; + stopProcess(ffmpeg); + }, + onSkip: () => { + skippedFrames += 1; + }, + }); const frameParser = createJpegFrameParser(handleJpegFrame); logger.start(); @@ -898,19 +907,12 @@ function streamSplitFrames(websocket, session) { const timestamp = frameIndex / fps; frameIndex += 1; - const sendResult = sendFramePacket(websocket, timestamp, jpeg, () => { - stopReason = 'websocket_send_error'; - stopProcess(ffmpeg); - }); + const sendResult = frameSender.send(timestamp, jpeg); if (sendResult === 'closed') { return false; } - if (sendResult === 'skipped') { - skippedFrames += 1; - } - return true; } } @@ -940,6 +942,7 @@ function createRelayPlayback(session) { let frameExitCode = null; let frameExitSignal = null; let frameEndSent = false; + let frameSender = null; let audioResponseFinished = false; let serverClosingWebSocket = false; let readyTimer = null; @@ -1000,6 +1003,16 @@ function createRelayPlayback(session) { } websocket = nextWebsocket; + frameSender = createLatestFrameSender(websocket, { + onError: (error) => { + if (error && !closed) { + stop('websocket_send_error'); + } + }, + onSkip: () => { + skippedFrames += 1; + }, + }); sendJson(websocket, { type: 'ready', @@ -1174,20 +1187,12 @@ function createRelayPlayback(session) { const timestamp = frameIndex / fps; frameIndex += 1; - const sendResult = sendFramePacket(websocket, timestamp, jpeg, (error) => { - if (error && !closed) { - stop('websocket_send_error'); - } - }); + const sendResult = frameSender?.send(timestamp, jpeg) ?? 'closed'; if (sendResult === 'closed') { return false; } - if (sendResult === 'skipped') { - skippedFrames += 1; - } - return true; } @@ -1304,6 +1309,7 @@ function createPlayback(session) { let started = false; let closed = false; let readyTimer = null; + let frameSender = null; const stderrTail = createFfmpegStderrTail(); const frameParser = createJpegFrameParser(handleJpegFrame); @@ -1356,6 +1362,16 @@ function createPlayback(session) { } websocket = nextWebsocket; + frameSender = createLatestFrameSender(websocket, { + onError: (error) => { + if (error && !closed) { + stop('websocket_send_error'); + } + }, + onSkip: () => { + skippedFrames += 1; + }, + }); sendJson(websocket, { type: 'ready', @@ -1509,20 +1525,12 @@ function createPlayback(session) { const timestamp = frameIndex / fps; frameIndex += 1; - const sendResult = sendFramePacket(websocket, timestamp, jpeg, (error) => { - if (error && !closed) { - stop('websocket_send_error'); - } - }); + const sendResult = frameSender?.send(timestamp, jpeg) ?? 'closed'; if (sendResult === 'closed') { return false; } - if (sendResult === 'skipped') { - skippedFrames += 1; - } - return true; } @@ -1670,26 +1678,104 @@ function createJpegFrameParser(onFrame) { } } -function sendFramePacket(websocket, timestamp, jpeg, onError) { - if (!isWebSocketOpen(websocket)) { - return 'closed'; - } +function createLatestFrameSender(websocket, { onError = () => {}, onSkip = () => {} } = {}) { + let sending = false; + let latestFrame = null; + let pumpTimer = null; - const packetBytes = 8 + getJpegFrameByteLength(jpeg); + return { + send(timestamp, jpeg) { + if (!isWebSocketOpen(websocket)) { + return 'closed'; + } - if (websocket.bufferedAmount > 0 && websocket.bufferedAmount + packetBytes > MAX_WS_BUFFER_BYTES) { - return 'skipped'; - } + const packetBytes = 8 + getJpegFrameByteLength(jpeg); - const packet = Buffer.allocUnsafe(packetBytes); - packet.writeDoubleLE(timestamp, 0); - copyJpegFrame(jpeg, packet, 8); - websocket.send(packet, { binary: true, compress: false }, (error) => { - if (error) { - onError(error); + if (sending || isWebSocketOverFrameBudget(websocket, packetBytes)) { + queueLatestFrame(timestamp, jpeg); + return 'queued'; + } + + return sendNow(timestamp, jpeg, packetBytes); + }, + }; + + function queueLatestFrame(timestamp, jpeg) { + if (latestFrame) { + onSkip(); } - }); - return 'sent'; + + latestFrame = { timestamp, jpeg }; + schedulePump(); + } + + function sendNow(timestamp, jpeg, packetBytes = 8 + getJpegFrameByteLength(jpeg)) { + if (!isWebSocketOpen(websocket)) { + return 'closed'; + } + + const packet = Buffer.allocUnsafe(packetBytes); + packet.writeDoubleLE(timestamp, 0); + copyJpegFrame(jpeg, packet, 8); + sending = true; + + try { + websocket.send(packet, { binary: true, compress: false }, (error) => { + sending = false; + + if (error) { + onError(error); + return; + } + + pumpLatestFrame(); + }); + } catch (error) { + sending = false; + onError(error); + return 'closed'; + } + + return 'sent'; + } + + function pumpLatestFrame() { + if (!latestFrame || sending) { + return; + } + + if (!isWebSocketOpen(websocket)) { + latestFrame = null; + return; + } + + const packetBytes = 8 + getJpegFrameByteLength(latestFrame.jpeg); + + if (isWebSocketOverFrameBudget(websocket, packetBytes)) { + schedulePump(); + return; + } + + const frame = latestFrame; + latestFrame = null; + sendNow(frame.timestamp, frame.jpeg, packetBytes); + } + + function schedulePump() { + if (pumpTimer) { + return; + } + + pumpTimer = setTimeout(() => { + pumpTimer = null; + pumpLatestFrame(); + }, 10); + pumpTimer.unref?.(); + } +} + +function isWebSocketOverFrameBudget(websocket, packetBytes) { + return websocket.bufferedAmount > 0 && websocket.bufferedAmount + packetBytes > MAX_WS_BUFFER_BYTES; } function getJpegFrameByteLength(jpeg) {