potential fix for OOM

This commit is contained in:
2026-05-02 00:05:11 -07:00
parent 0ec8f0faf9
commit acd73436a7
2 changed files with 45 additions and 9 deletions

View File

@@ -1417,6 +1417,7 @@ function createRelayInputBranch(stream, onError) {
let accepting = true;
let ending = false;
let drainPending = false;
let streamPendingBytes = 0;
let queue = [];
let queueBytes = 0;
let peakBytes = 0;
@@ -1424,18 +1425,28 @@ function createRelayInputBranch(stream, onError) {
stream.on('drain', () => {
drainPending = false;
streamPendingBytes = 0;
updatePeakBytes();
notifyWaiters();
flush();
});
stream.on('error', (error) => {
accepting = false;
streamPendingBytes = 0;
notifyWaiters();
onError(error);
});
stream.on('close', () => {
accepting = false;
streamPendingBytes = 0;
notifyWaiters();
});
return {
get queueBytes() {
return queueBytes;
return getPendingBytes();
},
get peakBytes() {
return peakBytes;
@@ -1451,9 +1462,9 @@ function createRelayInputBranch(stream, onError) {
queue.push(chunk);
queueBytes += chunk.length;
peakBytes = Math.max(peakBytes, queueBytes);
updatePeakBytes();
notifyWaiters();
return queueBytes <= MAX_RELAY_BRANCH_QUEUE_BYTES;
return getPendingBytes() <= MAX_RELAY_BRANCH_QUEUE_BYTES;
},
end() {
accepting = false;
@@ -1465,6 +1476,7 @@ function createRelayInputBranch(stream, onError) {
ending = false;
queue = [];
queueBytes = 0;
streamPendingBytes = 0;
notifyWaiters();
if (!stream.destroyed) {
@@ -1472,7 +1484,7 @@ function createRelayInputBranch(stream, onError) {
}
},
waitForCapacity() {
if (queueBytes <= RELAY_BRANCH_PAUSE_BYTES || !accepting || stream.destroyed) {
if (getPendingBytes() <= RELAY_BRANCH_PAUSE_BYTES || !accepting || stream.destroyed) {
return Promise.resolve();
}
@@ -1484,10 +1496,17 @@ function createRelayInputBranch(stream, onError) {
function writeNow(chunk) {
try {
drainPending = !stream.write(chunk);
if (!stream.write(chunk)) {
drainPending = true;
streamPendingBytes += chunk.length;
updatePeakBytes();
}
notifyWaiters();
return true;
} catch (error) {
accepting = false;
streamPendingBytes = 0;
notifyWaiters();
onError(error);
return false;
@@ -1498,11 +1517,12 @@ function createRelayInputBranch(stream, onError) {
while (queue.length > 0 && !drainPending && !stream.destroyed) {
const chunk = queue.shift();
queueBytes -= chunk.length;
notifyWaiters();
if (!writeNow(chunk)) {
return;
}
notifyWaiters();
}
if (ending && queue.length === 0 && !drainPending && !stream.destroyed) {
@@ -1511,7 +1531,7 @@ function createRelayInputBranch(stream, onError) {
}
function notifyWaiters() {
if (queueBytes > RELAY_BRANCH_PAUSE_BYTES && accepting && !stream.destroyed) {
if (getPendingBytes() > RELAY_BRANCH_PAUSE_BYTES && accepting && !stream.destroyed) {
return;
}
@@ -1522,11 +1542,25 @@ function createRelayInputBranch(stream, onError) {
resolve();
}
}
function getPendingBytes() {
return queueBytes + streamPendingBytes;
}
function updatePeakBytes() {
peakBytes = Math.max(peakBytes, getPendingBytes());
}
}
async function waitForRelayCapacity(branches, shouldStop) {
while (!shouldStop() && branches.some((branch) => branch.queueBytes > RELAY_BRANCH_PAUSE_BYTES)) {
await Promise.race(branches.map((branch) => branch.waitForCapacity()));
while (!shouldStop()) {
const blockedBranches = branches.filter((branch) => branch.queueBytes > RELAY_BRANCH_PAUSE_BYTES);
if (blockedBranches.length === 0) {
return;
}
await Promise.race(blockedBranches.map((branch) => branch.waitForCapacity()));
}
}