Improve frame dropping under backpressure
This commit is contained in:
@@ -657,7 +657,7 @@ function trimPendingFrameQueue() {
|
||||
const overflow = state.pendingFrames.length - maxQueuedFrames;
|
||||
|
||||
if (overflow > 0) {
|
||||
state.pendingFrames.splice(maxQueuedFrames, overflow);
|
||||
state.pendingFrames.splice(0, overflow);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -671,7 +671,7 @@ function trimFrameQueue() {
|
||||
return;
|
||||
}
|
||||
|
||||
const removed = state.frames.splice(maxQueuedFrames, overflow);
|
||||
const removed = state.frames.splice(0, overflow);
|
||||
|
||||
for (const frame of removed) {
|
||||
releaseImage(frame.bitmap);
|
||||
|
||||
170
server/index.js
170
server/index.js
@@ -818,6 +818,15 @@ function streamSplitFrames(websocket, session) {
|
||||
let skippedFrames = 0;
|
||||
let stopReason = 'process_exit';
|
||||
let serverClosingWebSocket = false;
|
||||
const frameSender = createLatestFrameSender(websocket, {
|
||||
onError: () => {
|
||||
stopReason = 'websocket_send_error';
|
||||
stopProcess(ffmpeg);
|
||||
},
|
||||
onSkip: () => {
|
||||
skippedFrames += 1;
|
||||
},
|
||||
});
|
||||
const frameParser = createJpegFrameParser(handleJpegFrame);
|
||||
|
||||
logger.start();
|
||||
@@ -898,19 +907,12 @@ function streamSplitFrames(websocket, session) {
|
||||
const timestamp = frameIndex / fps;
|
||||
frameIndex += 1;
|
||||
|
||||
const sendResult = sendFramePacket(websocket, timestamp, jpeg, () => {
|
||||
stopReason = 'websocket_send_error';
|
||||
stopProcess(ffmpeg);
|
||||
});
|
||||
const sendResult = frameSender.send(timestamp, jpeg);
|
||||
|
||||
if (sendResult === 'closed') {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (sendResult === 'skipped') {
|
||||
skippedFrames += 1;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
}
|
||||
@@ -940,6 +942,7 @@ function createRelayPlayback(session) {
|
||||
let frameExitCode = null;
|
||||
let frameExitSignal = null;
|
||||
let frameEndSent = false;
|
||||
let frameSender = null;
|
||||
let audioResponseFinished = false;
|
||||
let serverClosingWebSocket = false;
|
||||
let readyTimer = null;
|
||||
@@ -1000,6 +1003,16 @@ function createRelayPlayback(session) {
|
||||
}
|
||||
|
||||
websocket = nextWebsocket;
|
||||
frameSender = createLatestFrameSender(websocket, {
|
||||
onError: (error) => {
|
||||
if (error && !closed) {
|
||||
stop('websocket_send_error');
|
||||
}
|
||||
},
|
||||
onSkip: () => {
|
||||
skippedFrames += 1;
|
||||
},
|
||||
});
|
||||
|
||||
sendJson(websocket, {
|
||||
type: 'ready',
|
||||
@@ -1174,20 +1187,12 @@ function createRelayPlayback(session) {
|
||||
const timestamp = frameIndex / fps;
|
||||
frameIndex += 1;
|
||||
|
||||
const sendResult = sendFramePacket(websocket, timestamp, jpeg, (error) => {
|
||||
if (error && !closed) {
|
||||
stop('websocket_send_error');
|
||||
}
|
||||
});
|
||||
const sendResult = frameSender?.send(timestamp, jpeg) ?? 'closed';
|
||||
|
||||
if (sendResult === 'closed') {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (sendResult === 'skipped') {
|
||||
skippedFrames += 1;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -1304,6 +1309,7 @@ function createPlayback(session) {
|
||||
let started = false;
|
||||
let closed = false;
|
||||
let readyTimer = null;
|
||||
let frameSender = null;
|
||||
const stderrTail = createFfmpegStderrTail();
|
||||
const frameParser = createJpegFrameParser(handleJpegFrame);
|
||||
|
||||
@@ -1356,6 +1362,16 @@ function createPlayback(session) {
|
||||
}
|
||||
|
||||
websocket = nextWebsocket;
|
||||
frameSender = createLatestFrameSender(websocket, {
|
||||
onError: (error) => {
|
||||
if (error && !closed) {
|
||||
stop('websocket_send_error');
|
||||
}
|
||||
},
|
||||
onSkip: () => {
|
||||
skippedFrames += 1;
|
||||
},
|
||||
});
|
||||
|
||||
sendJson(websocket, {
|
||||
type: 'ready',
|
||||
@@ -1509,20 +1525,12 @@ function createPlayback(session) {
|
||||
const timestamp = frameIndex / fps;
|
||||
frameIndex += 1;
|
||||
|
||||
const sendResult = sendFramePacket(websocket, timestamp, jpeg, (error) => {
|
||||
if (error && !closed) {
|
||||
stop('websocket_send_error');
|
||||
}
|
||||
});
|
||||
const sendResult = frameSender?.send(timestamp, jpeg) ?? 'closed';
|
||||
|
||||
if (sendResult === 'closed') {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (sendResult === 'skipped') {
|
||||
skippedFrames += 1;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -1670,26 +1678,104 @@ function createJpegFrameParser(onFrame) {
|
||||
}
|
||||
}
|
||||
|
||||
function sendFramePacket(websocket, timestamp, jpeg, onError) {
|
||||
if (!isWebSocketOpen(websocket)) {
|
||||
return 'closed';
|
||||
}
|
||||
function createLatestFrameSender(websocket, { onError = () => {}, onSkip = () => {} } = {}) {
|
||||
let sending = false;
|
||||
let latestFrame = null;
|
||||
let pumpTimer = null;
|
||||
|
||||
const packetBytes = 8 + getJpegFrameByteLength(jpeg);
|
||||
return {
|
||||
send(timestamp, jpeg) {
|
||||
if (!isWebSocketOpen(websocket)) {
|
||||
return 'closed';
|
||||
}
|
||||
|
||||
if (websocket.bufferedAmount > 0 && websocket.bufferedAmount + packetBytes > MAX_WS_BUFFER_BYTES) {
|
||||
return 'skipped';
|
||||
}
|
||||
const packetBytes = 8 + getJpegFrameByteLength(jpeg);
|
||||
|
||||
const packet = Buffer.allocUnsafe(packetBytes);
|
||||
packet.writeDoubleLE(timestamp, 0);
|
||||
copyJpegFrame(jpeg, packet, 8);
|
||||
websocket.send(packet, { binary: true, compress: false }, (error) => {
|
||||
if (error) {
|
||||
onError(error);
|
||||
if (sending || isWebSocketOverFrameBudget(websocket, packetBytes)) {
|
||||
queueLatestFrame(timestamp, jpeg);
|
||||
return 'queued';
|
||||
}
|
||||
|
||||
return sendNow(timestamp, jpeg, packetBytes);
|
||||
},
|
||||
};
|
||||
|
||||
function queueLatestFrame(timestamp, jpeg) {
|
||||
if (latestFrame) {
|
||||
onSkip();
|
||||
}
|
||||
});
|
||||
return 'sent';
|
||||
|
||||
latestFrame = { timestamp, jpeg };
|
||||
schedulePump();
|
||||
}
|
||||
|
||||
function sendNow(timestamp, jpeg, packetBytes = 8 + getJpegFrameByteLength(jpeg)) {
|
||||
if (!isWebSocketOpen(websocket)) {
|
||||
return 'closed';
|
||||
}
|
||||
|
||||
const packet = Buffer.allocUnsafe(packetBytes);
|
||||
packet.writeDoubleLE(timestamp, 0);
|
||||
copyJpegFrame(jpeg, packet, 8);
|
||||
sending = true;
|
||||
|
||||
try {
|
||||
websocket.send(packet, { binary: true, compress: false }, (error) => {
|
||||
sending = false;
|
||||
|
||||
if (error) {
|
||||
onError(error);
|
||||
return;
|
||||
}
|
||||
|
||||
pumpLatestFrame();
|
||||
});
|
||||
} catch (error) {
|
||||
sending = false;
|
||||
onError(error);
|
||||
return 'closed';
|
||||
}
|
||||
|
||||
return 'sent';
|
||||
}
|
||||
|
||||
function pumpLatestFrame() {
|
||||
if (!latestFrame || sending) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (!isWebSocketOpen(websocket)) {
|
||||
latestFrame = null;
|
||||
return;
|
||||
}
|
||||
|
||||
const packetBytes = 8 + getJpegFrameByteLength(latestFrame.jpeg);
|
||||
|
||||
if (isWebSocketOverFrameBudget(websocket, packetBytes)) {
|
||||
schedulePump();
|
||||
return;
|
||||
}
|
||||
|
||||
const frame = latestFrame;
|
||||
latestFrame = null;
|
||||
sendNow(frame.timestamp, frame.jpeg, packetBytes);
|
||||
}
|
||||
|
||||
function schedulePump() {
|
||||
if (pumpTimer) {
|
||||
return;
|
||||
}
|
||||
|
||||
pumpTimer = setTimeout(() => {
|
||||
pumpTimer = null;
|
||||
pumpLatestFrame();
|
||||
}, 10);
|
||||
pumpTimer.unref?.();
|
||||
}
|
||||
}
|
||||
|
||||
function isWebSocketOverFrameBudget(websocket, packetBytes) {
|
||||
return websocket.bufferedAmount > 0 && websocket.bufferedAmount + packetBytes > MAX_WS_BUFFER_BYTES;
|
||||
}
|
||||
|
||||
function getJpegFrameByteLength(jpeg) {
|
||||
|
||||
Reference in New Issue
Block a user