From acd73436a78c160ae3ba90d460c11e55594312bb Mon Sep 17 00:00:00 2001 From: James Magahern Date: Sat, 2 May 2026 00:05:11 -0700 Subject: [PATCH] potential fix for OOM --- AGENTS.md | 2 ++ server/index.js | 52 ++++++++++++++++++++++++++++++++++++++++--------- 2 files changed, 45 insertions(+), 9 deletions(-) diff --git a/AGENTS.md b/AGENTS.md index 725a7aa..1ad3f5f 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -121,6 +121,8 @@ Important behavior: - Uses bounded branch queues via `createRelayInputBranch`. - Pauses upstream reading while any branch queue exceeds half of `MAX_RELAY_BRANCH_QUEUE_BYTES`. - Stops playback if any branch queue exceeds `MAX_RELAY_BRANCH_QUEUE_BYTES`. +- Backpressure accounting must include both chunks queued in JavaScript and bytes already written to ffmpeg stdin while waiting for `drain`. Otherwise fast movie sources can outrun realtime ffmpeg consumption and grow Node heap until OOM. +- When waiting for relay capacity, wait only on branches that are actually over the pause threshold. Including already-ready branches in a `Promise.race` can create an immediate-resolution spin loop. Relay mode works best for sequential stream containers such as MPEG-TS/IPTV. It may be less reliable for file formats that require seeking or late metadata, such as some MP4 files. diff --git a/server/index.js b/server/index.js index 5a212a0..f445dfe 100644 --- a/server/index.js +++ b/server/index.js @@ -1417,6 +1417,7 @@ function createRelayInputBranch(stream, onError) { let accepting = true; let ending = false; let drainPending = false; + let streamPendingBytes = 0; let queue = []; let queueBytes = 0; let peakBytes = 0; @@ -1424,18 +1425,28 @@ function createRelayInputBranch(stream, onError) { stream.on('drain', () => { drainPending = false; + streamPendingBytes = 0; + updatePeakBytes(); + notifyWaiters(); flush(); }); stream.on('error', (error) => { accepting = false; + streamPendingBytes = 0; notifyWaiters(); onError(error); }); + stream.on('close', () => { + accepting = false; + streamPendingBytes = 0; + notifyWaiters(); + }); + return { get queueBytes() { - return queueBytes; + return getPendingBytes(); }, get peakBytes() { return peakBytes; @@ -1451,9 +1462,9 @@ function createRelayInputBranch(stream, onError) { queue.push(chunk); queueBytes += chunk.length; - peakBytes = Math.max(peakBytes, queueBytes); + updatePeakBytes(); notifyWaiters(); - return queueBytes <= MAX_RELAY_BRANCH_QUEUE_BYTES; + return getPendingBytes() <= MAX_RELAY_BRANCH_QUEUE_BYTES; }, end() { accepting = false; @@ -1465,6 +1476,7 @@ function createRelayInputBranch(stream, onError) { ending = false; queue = []; queueBytes = 0; + streamPendingBytes = 0; notifyWaiters(); if (!stream.destroyed) { @@ -1472,7 +1484,7 @@ function createRelayInputBranch(stream, onError) { } }, waitForCapacity() { - if (queueBytes <= RELAY_BRANCH_PAUSE_BYTES || !accepting || stream.destroyed) { + if (getPendingBytes() <= RELAY_BRANCH_PAUSE_BYTES || !accepting || stream.destroyed) { return Promise.resolve(); } @@ -1484,10 +1496,17 @@ function createRelayInputBranch(stream, onError) { function writeNow(chunk) { try { - drainPending = !stream.write(chunk); + if (!stream.write(chunk)) { + drainPending = true; + streamPendingBytes += chunk.length; + updatePeakBytes(); + } + + notifyWaiters(); return true; } catch (error) { accepting = false; + streamPendingBytes = 0; notifyWaiters(); onError(error); return false; @@ -1498,11 +1517,12 @@ function createRelayInputBranch(stream, onError) { while (queue.length > 0 && !drainPending && !stream.destroyed) { const chunk = queue.shift(); queueBytes -= chunk.length; - notifyWaiters(); if (!writeNow(chunk)) { return; } + + notifyWaiters(); } if (ending && queue.length === 0 && !drainPending && !stream.destroyed) { @@ -1511,7 +1531,7 @@ function createRelayInputBranch(stream, onError) { } function notifyWaiters() { - if (queueBytes > RELAY_BRANCH_PAUSE_BYTES && accepting && !stream.destroyed) { + if (getPendingBytes() > RELAY_BRANCH_PAUSE_BYTES && accepting && !stream.destroyed) { return; } @@ -1522,11 +1542,25 @@ function createRelayInputBranch(stream, onError) { resolve(); } } + + function getPendingBytes() { + return queueBytes + streamPendingBytes; + } + + function updatePeakBytes() { + peakBytes = Math.max(peakBytes, getPendingBytes()); + } } async function waitForRelayCapacity(branches, shouldStop) { - while (!shouldStop() && branches.some((branch) => branch.queueBytes > RELAY_BRANCH_PAUSE_BYTES)) { - await Promise.race(branches.map((branch) => branch.waitForCapacity())); + while (!shouldStop()) { + const blockedBranches = branches.filter((branch) => branch.queueBytes > RELAY_BRANCH_PAUSE_BYTES); + + if (blockedBranches.length === 0) { + return; + } + + await Promise.race(blockedBranches.map((branch) => branch.waitForCapacity())); } }