diff --git a/server/index.js b/server/index.js index 6d7896a..bdc3f07 100644 --- a/server/index.js +++ b/server/index.js @@ -617,11 +617,11 @@ function streamSplitFrames(websocket, session) { const stderrTail = createFfmpegStderrTail(); const { fps, quality, width } = session.options; const label = `frames:${shortId(session.id)}`; - let frameBuffer = Buffer.alloc(0); let frameIndex = 0; let skippedFrames = 0; let stopReason = 'process_exit'; let serverClosingWebSocket = false; + const frameParser = createJpegFrameParser(handleJpegFrame); logger.start(); @@ -694,45 +694,27 @@ function streamSplitFrames(websocket, session) { }); function handleFrameData(chunk) { - frameBuffer = Buffer.concat([frameBuffer, chunk]); + frameParser.write(chunk); + } - for (;;) { - const start = frameBuffer.indexOf(JPEG_SOI); + function handleJpegFrame(jpeg) { + const timestamp = frameIndex / fps; + frameIndex += 1; - if (start === -1) { - frameBuffer = Buffer.alloc(0); - return; - } + const sendResult = sendFramePacket(websocket, timestamp, jpeg, () => { + stopReason = 'websocket_send_error'; + stopProcess(ffmpeg); + }); - const end = frameBuffer.indexOf(JPEG_EOI, start + JPEG_SOI.length); - - if (end === -1) { - frameBuffer = start === 0 ? frameBuffer : frameBuffer.subarray(start); - return; - } - - const jpeg = frameBuffer.subarray(start, end + JPEG_EOI.length); - frameBuffer = frameBuffer.subarray(end + JPEG_EOI.length); - const timestamp = frameIndex / fps; - frameIndex += 1; - - if (!isWebSocketOpen(websocket)) { - return; - } - - const sendResult = sendFramePacket(websocket, timestamp, jpeg, () => { - stopReason = 'websocket_send_error'; - stopProcess(ffmpeg); - }); - - if (sendResult === 'closed') { - return; - } - - if (sendResult === 'skipped') { - skippedFrames += 1; - } + if (sendResult === 'closed') { + return false; } + + if (sendResult === 'skipped') { + skippedFrames += 1; + } + + return true; } } @@ -749,7 +731,6 @@ function createRelayPlayback(session) { let frameLogger = null; let sourceController = null; let sourceBytes = 0; - let frameBuffer = Buffer.alloc(0); let frameIndex = 0; let skippedFrames = 0; let stopReason = 'process_exit'; @@ -767,6 +748,7 @@ function createRelayPlayback(session) { let readyTimer = null; const audioStderr = createFfmpegStderrTail(); const frameStderr = createFfmpegStderrTail(); + const frameParser = createJpegFrameParser(handleJpegFrame); const playback = { get closed() { @@ -987,46 +969,28 @@ function createRelayPlayback(session) { } function handleFrameData(chunk) { - frameBuffer = Buffer.concat([frameBuffer, chunk]); + frameParser.write(chunk); + } - for (;;) { - const start = frameBuffer.indexOf(JPEG_SOI); + function handleJpegFrame(jpeg) { + const timestamp = frameIndex / fps; + frameIndex += 1; - if (start === -1) { - frameBuffer = Buffer.alloc(0); - return; + const sendResult = sendFramePacket(websocket, timestamp, jpeg, (error) => { + if (error && !closed) { + stop('websocket_send_error'); } + }); - const end = frameBuffer.indexOf(JPEG_EOI, start + JPEG_SOI.length); - - if (end === -1) { - frameBuffer = start === 0 ? frameBuffer : frameBuffer.subarray(start); - return; - } - - const jpeg = frameBuffer.subarray(start, end + JPEG_EOI.length); - frameBuffer = frameBuffer.subarray(end + JPEG_EOI.length); - const timestamp = frameIndex / fps; - frameIndex += 1; - - if (!isWebSocketOpen(websocket)) { - return; - } - - const sendResult = sendFramePacket(websocket, timestamp, jpeg, (error) => { - if (error && !closed) { - stop('websocket_send_error'); - } - }); - - if (sendResult === 'closed') { - return; - } - - if (sendResult === 'skipped') { - skippedFrames += 1; - } + if (sendResult === 'closed') { + return false; } + + if (sendResult === 'skipped') { + skippedFrames += 1; + } + + return true; } function stop(reason) { @@ -1136,7 +1100,6 @@ function createPlayback(session) { let logger = null; let releaseSource = () => {}; let stderr = ''; - let frameBuffer = Buffer.alloc(0); let frameIndex = 0; let skippedFrames = 0; let stopReason = 'process_exit'; @@ -1144,6 +1107,7 @@ function createPlayback(session) { let closed = false; let readyTimer = null; const stderrTail = createFfmpegStderrTail(); + const frameParser = createJpegFrameParser(handleJpegFrame); const playback = { get closed() { @@ -1339,46 +1303,28 @@ function createPlayback(session) { } function handleFrameData(chunk) { - frameBuffer = Buffer.concat([frameBuffer, chunk]); + frameParser.write(chunk); + } - for (;;) { - const start = frameBuffer.indexOf(JPEG_SOI); + function handleJpegFrame(jpeg) { + const timestamp = frameIndex / fps; + frameIndex += 1; - if (start === -1) { - frameBuffer = Buffer.alloc(0); - return; + const sendResult = sendFramePacket(websocket, timestamp, jpeg, (error) => { + if (error && !closed) { + stop('websocket_send_error'); } + }); - const end = frameBuffer.indexOf(JPEG_EOI, start + JPEG_SOI.length); - - if (end === -1) { - frameBuffer = start === 0 ? frameBuffer : frameBuffer.subarray(start); - return; - } - - const jpeg = frameBuffer.subarray(start, end + JPEG_EOI.length); - frameBuffer = frameBuffer.subarray(end + JPEG_EOI.length); - const timestamp = frameIndex / fps; - frameIndex += 1; - - if (!isWebSocketOpen(websocket)) { - return; - } - - const sendResult = sendFramePacket(websocket, timestamp, jpeg, (error) => { - if (error && !closed) { - stop('websocket_send_error'); - } - }); - - if (sendResult === 'closed') { - return; - } - - if (sendResult === 'skipped') { - skippedFrames += 1; - } + if (sendResult === 'closed') { + return false; } + + if (sendResult === 'skipped') { + skippedFrames += 1; + } + + return true; } function stop(reason) { @@ -1436,12 +1382,101 @@ function isWebSocketOpen(websocket) { return websocket?.readyState === WebSocket.OPEN; } +function createJpegFrameParser(onFrame) { + let collecting = false; + let pendingMarkerByte = false; + let parts = []; + let byteLength = 0; + + return { + write(chunk) { + const buffer = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk); + let offset = 0; + + while (offset < buffer.length) { + if (!collecting) { + if (pendingMarkerByte && buffer[offset] === 0xd8) { + collecting = true; + pendingMarkerByte = false; + parts = [JPEG_SOI]; + byteLength = JPEG_SOI.length; + offset += 1; + } else { + const start = buffer.indexOf(JPEG_SOI, offset); + + if (start === -1) { + pendingMarkerByte = buffer[buffer.length - 1] === 0xff; + return true; + } + + collecting = true; + pendingMarkerByte = false; + parts = []; + byteLength = 0; + offset = start; + } + } + + if (pendingMarkerByte) { + pendingMarkerByte = false; + + if (buffer[offset] === 0xd9) { + appendFramePart(buffer.subarray(offset, offset + 1)); + offset += 1; + + if (!emitFrame()) { + return false; + } + + continue; + } + } + + const end = buffer.indexOf(JPEG_EOI, offset); + + if (end === -1) { + appendFramePart(buffer.subarray(offset)); + pendingMarkerByte = buffer[buffer.length - 1] === 0xff; + return true; + } + + appendFramePart(buffer.subarray(offset, end + JPEG_EOI.length)); + offset = end + JPEG_EOI.length; + + if (!emitFrame()) { + return false; + } + } + + return true; + }, + }; + + function appendFramePart(part) { + if (part.length === 0) { + return; + } + + parts.push(part); + byteLength += part.length; + } + + function emitFrame() { + const frame = { parts, byteLength }; + collecting = false; + pendingMarkerByte = false; + parts = []; + byteLength = 0; + return onFrame(frame) !== false; + } +} + function sendFramePacket(websocket, timestamp, jpeg, onError) { if (!isWebSocketOpen(websocket)) { return 'closed'; } - const packetBytes = 8 + jpeg.length; + const packetBytes = 8 + getJpegFrameByteLength(jpeg); if (websocket.bufferedAmount > 0 && websocket.bufferedAmount + packetBytes > MAX_WS_BUFFER_BYTES) { return 'skipped'; @@ -1449,7 +1484,7 @@ function sendFramePacket(websocket, timestamp, jpeg, onError) { const packet = Buffer.allocUnsafe(packetBytes); packet.writeDoubleLE(timestamp, 0); - jpeg.copy(packet, 8); + copyJpegFrame(jpeg, packet, 8); websocket.send(packet, { binary: true }, (error) => { if (error) { onError(error); @@ -1458,6 +1493,24 @@ function sendFramePacket(websocket, timestamp, jpeg, onError) { return 'sent'; } +function getJpegFrameByteLength(jpeg) { + return Buffer.isBuffer(jpeg) ? jpeg.length : jpeg.byteLength; +} + +function copyJpegFrame(jpeg, packet, offset) { + if (Buffer.isBuffer(jpeg)) { + jpeg.copy(packet, offset); + return; + } + + let writeOffset = offset; + + for (const part of jpeg.parts) { + part.copy(packet, writeOffset); + writeOffset += part.length; + } +} + function createSourceInput(sessionId, kind) { const token = randomUUID(); sourceTokens.set(token, { sessionId, kind, createdAt: Date.now() });