better frame decoding on server

This commit is contained in:
2026-05-04 17:10:59 -07:00
parent 13b1d768dc
commit 81d9cfc1c2

View File

@@ -617,11 +617,11 @@ function streamSplitFrames(websocket, session) {
const stderrTail = createFfmpegStderrTail();
const { fps, quality, width } = session.options;
const label = `frames:${shortId(session.id)}`;
let frameBuffer = Buffer.alloc(0);
let frameIndex = 0;
let skippedFrames = 0;
let stopReason = 'process_exit';
let serverClosingWebSocket = false;
const frameParser = createJpegFrameParser(handleJpegFrame);
logger.start();
@@ -694,45 +694,27 @@ function streamSplitFrames(websocket, session) {
});
function handleFrameData(chunk) {
frameBuffer = Buffer.concat([frameBuffer, chunk]);
for (;;) {
const start = frameBuffer.indexOf(JPEG_SOI);
if (start === -1) {
frameBuffer = Buffer.alloc(0);
return;
frameParser.write(chunk);
}
const end = frameBuffer.indexOf(JPEG_EOI, start + JPEG_SOI.length);
if (end === -1) {
frameBuffer = start === 0 ? frameBuffer : frameBuffer.subarray(start);
return;
}
const jpeg = frameBuffer.subarray(start, end + JPEG_EOI.length);
frameBuffer = frameBuffer.subarray(end + JPEG_EOI.length);
function handleJpegFrame(jpeg) {
const timestamp = frameIndex / fps;
frameIndex += 1;
if (!isWebSocketOpen(websocket)) {
return;
}
const sendResult = sendFramePacket(websocket, timestamp, jpeg, () => {
stopReason = 'websocket_send_error';
stopProcess(ffmpeg);
});
if (sendResult === 'closed') {
return;
return false;
}
if (sendResult === 'skipped') {
skippedFrames += 1;
}
}
return true;
}
}
@@ -749,7 +731,6 @@ function createRelayPlayback(session) {
let frameLogger = null;
let sourceController = null;
let sourceBytes = 0;
let frameBuffer = Buffer.alloc(0);
let frameIndex = 0;
let skippedFrames = 0;
let stopReason = 'process_exit';
@@ -767,6 +748,7 @@ function createRelayPlayback(session) {
let readyTimer = null;
const audioStderr = createFfmpegStderrTail();
const frameStderr = createFfmpegStderrTail();
const frameParser = createJpegFrameParser(handleJpegFrame);
const playback = {
get closed() {
@@ -987,32 +969,13 @@ function createRelayPlayback(session) {
}
function handleFrameData(chunk) {
frameBuffer = Buffer.concat([frameBuffer, chunk]);
for (;;) {
const start = frameBuffer.indexOf(JPEG_SOI);
if (start === -1) {
frameBuffer = Buffer.alloc(0);
return;
frameParser.write(chunk);
}
const end = frameBuffer.indexOf(JPEG_EOI, start + JPEG_SOI.length);
if (end === -1) {
frameBuffer = start === 0 ? frameBuffer : frameBuffer.subarray(start);
return;
}
const jpeg = frameBuffer.subarray(start, end + JPEG_EOI.length);
frameBuffer = frameBuffer.subarray(end + JPEG_EOI.length);
function handleJpegFrame(jpeg) {
const timestamp = frameIndex / fps;
frameIndex += 1;
if (!isWebSocketOpen(websocket)) {
return;
}
const sendResult = sendFramePacket(websocket, timestamp, jpeg, (error) => {
if (error && !closed) {
stop('websocket_send_error');
@@ -1020,13 +983,14 @@ function createRelayPlayback(session) {
});
if (sendResult === 'closed') {
return;
return false;
}
if (sendResult === 'skipped') {
skippedFrames += 1;
}
}
return true;
}
function stop(reason) {
@@ -1136,7 +1100,6 @@ function createPlayback(session) {
let logger = null;
let releaseSource = () => {};
let stderr = '';
let frameBuffer = Buffer.alloc(0);
let frameIndex = 0;
let skippedFrames = 0;
let stopReason = 'process_exit';
@@ -1144,6 +1107,7 @@ function createPlayback(session) {
let closed = false;
let readyTimer = null;
const stderrTail = createFfmpegStderrTail();
const frameParser = createJpegFrameParser(handleJpegFrame);
const playback = {
get closed() {
@@ -1339,32 +1303,13 @@ function createPlayback(session) {
}
function handleFrameData(chunk) {
frameBuffer = Buffer.concat([frameBuffer, chunk]);
for (;;) {
const start = frameBuffer.indexOf(JPEG_SOI);
if (start === -1) {
frameBuffer = Buffer.alloc(0);
return;
frameParser.write(chunk);
}
const end = frameBuffer.indexOf(JPEG_EOI, start + JPEG_SOI.length);
if (end === -1) {
frameBuffer = start === 0 ? frameBuffer : frameBuffer.subarray(start);
return;
}
const jpeg = frameBuffer.subarray(start, end + JPEG_EOI.length);
frameBuffer = frameBuffer.subarray(end + JPEG_EOI.length);
function handleJpegFrame(jpeg) {
const timestamp = frameIndex / fps;
frameIndex += 1;
if (!isWebSocketOpen(websocket)) {
return;
}
const sendResult = sendFramePacket(websocket, timestamp, jpeg, (error) => {
if (error && !closed) {
stop('websocket_send_error');
@@ -1372,13 +1317,14 @@ function createPlayback(session) {
});
if (sendResult === 'closed') {
return;
return false;
}
if (sendResult === 'skipped') {
skippedFrames += 1;
}
}
return true;
}
function stop(reason) {
@@ -1436,12 +1382,101 @@ function isWebSocketOpen(websocket) {
return websocket?.readyState === WebSocket.OPEN;
}
function createJpegFrameParser(onFrame) {
let collecting = false;
let pendingMarkerByte = false;
let parts = [];
let byteLength = 0;
return {
write(chunk) {
const buffer = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk);
let offset = 0;
while (offset < buffer.length) {
if (!collecting) {
if (pendingMarkerByte && buffer[offset] === 0xd8) {
collecting = true;
pendingMarkerByte = false;
parts = [JPEG_SOI];
byteLength = JPEG_SOI.length;
offset += 1;
} else {
const start = buffer.indexOf(JPEG_SOI, offset);
if (start === -1) {
pendingMarkerByte = buffer[buffer.length - 1] === 0xff;
return true;
}
collecting = true;
pendingMarkerByte = false;
parts = [];
byteLength = 0;
offset = start;
}
}
if (pendingMarkerByte) {
pendingMarkerByte = false;
if (buffer[offset] === 0xd9) {
appendFramePart(buffer.subarray(offset, offset + 1));
offset += 1;
if (!emitFrame()) {
return false;
}
continue;
}
}
const end = buffer.indexOf(JPEG_EOI, offset);
if (end === -1) {
appendFramePart(buffer.subarray(offset));
pendingMarkerByte = buffer[buffer.length - 1] === 0xff;
return true;
}
appendFramePart(buffer.subarray(offset, end + JPEG_EOI.length));
offset = end + JPEG_EOI.length;
if (!emitFrame()) {
return false;
}
}
return true;
},
};
function appendFramePart(part) {
if (part.length === 0) {
return;
}
parts.push(part);
byteLength += part.length;
}
function emitFrame() {
const frame = { parts, byteLength };
collecting = false;
pendingMarkerByte = false;
parts = [];
byteLength = 0;
return onFrame(frame) !== false;
}
}
function sendFramePacket(websocket, timestamp, jpeg, onError) {
if (!isWebSocketOpen(websocket)) {
return 'closed';
}
const packetBytes = 8 + jpeg.length;
const packetBytes = 8 + getJpegFrameByteLength(jpeg);
if (websocket.bufferedAmount > 0 && websocket.bufferedAmount + packetBytes > MAX_WS_BUFFER_BYTES) {
return 'skipped';
@@ -1449,7 +1484,7 @@ function sendFramePacket(websocket, timestamp, jpeg, onError) {
const packet = Buffer.allocUnsafe(packetBytes);
packet.writeDoubleLE(timestamp, 0);
jpeg.copy(packet, 8);
copyJpegFrame(jpeg, packet, 8);
websocket.send(packet, { binary: true }, (error) => {
if (error) {
onError(error);
@@ -1458,6 +1493,24 @@ function sendFramePacket(websocket, timestamp, jpeg, onError) {
return 'sent';
}
function getJpegFrameByteLength(jpeg) {
return Buffer.isBuffer(jpeg) ? jpeg.length : jpeg.byteLength;
}
function copyJpegFrame(jpeg, packet, offset) {
if (Buffer.isBuffer(jpeg)) {
jpeg.copy(packet, offset);
return;
}
let writeOffset = offset;
for (const part of jpeg.parts) {
part.copy(packet, writeOffset);
writeOffset += part.length;
}
}
function createSourceInput(sessionId, kind) {
const token = randomUUID();
sourceTokens.set(token, { sessionId, kind, createdAt: Date.now() });