relay approach

This commit is contained in:
2026-05-01 23:21:16 -07:00
parent 2ec1b056a0
commit 1e214e8812
3 changed files with 826 additions and 29 deletions

View File

@@ -20,12 +20,15 @@ const PORT = Number(process.env.PORT ?? 3000);
const FFMPEG_PATH = process.env.FFMPEG_PATH ?? 'ffmpeg';
const FFMPEG_LOG_LEVEL = process.env.FFMPEG_LOG_LEVEL ?? 'warning';
const FFMPEG_INPUT_SEEKABLE = process.env.FFMPEG_INPUT_SEEKABLE ?? '0';
const PLAYBACK_CONNECTION_MODE = parsePlaybackConnectionMode(process.env.PLAYBACK_CONNECTION_MODE ?? process.env.PLAYBACK_MODE);
const RECENT_URLS_PATH = process.env.RECENT_URLS_PATH ?? path.join(__dirname, '..', 'data', 'recent-urls.json');
const RECENT_URL_LIMIT = clampInteger(process.env.RECENT_URL_LIMIT, 12, 1, 50);
const SESSION_TTL_MS = 60 * 60 * 1000;
const PLAYBACK_READY_TIMEOUT_MS = 15 * 1000;
const MAX_WS_BUFFER_BYTES = 12 * 1024 * 1024;
const MAX_AUDIO_QUEUE_BYTES = clampInteger(process.env.MAX_AUDIO_QUEUE_BYTES, 16 * 1024 * 1024, 256 * 1024, 128 * 1024 * 1024);
const MAX_RELAY_BRANCH_QUEUE_BYTES = clampInteger(process.env.MAX_RELAY_BRANCH_QUEUE_BYTES, 16 * 1024 * 1024, 512 * 1024, 256 * 1024 * 1024);
const RELAY_BRANCH_PAUSE_BYTES = Math.floor(MAX_RELAY_BRANCH_QUEUE_BYTES / 2);
const JPEG_SOI = Buffer.from([0xff, 0xd8]);
const JPEG_EOI = Buffer.from([0xff, 0xd9]);
@@ -47,7 +50,7 @@ app.use(express.json({ limit: '32kb' }));
app.use(express.static(publicDir));
app.get('/api/health', (_request, response) => {
response.json({ ok: true, ffmpeg: FFMPEG_PATH });
response.json({ ok: true, ffmpeg: FFMPEG_PATH, playbackConnectionMode: PLAYBACK_CONNECTION_MODE });
});
app.get('/api/recent-urls', (_request, response) => {
@@ -178,7 +181,12 @@ app.get('/audio/:sessionId', (request, response) => {
return;
}
getOrCreatePlayback(session).attachAudio(request, response);
if (PLAYBACK_CONNECTION_MODE === 'single' || PLAYBACK_CONNECTION_MODE === 'relay') {
getOrCreatePlayback(session).attachAudio(request, response);
return;
}
streamSplitAudio(request, response, session);
});
server.on('upgrade', (request, socket, head) => {
@@ -205,7 +213,12 @@ wss.on('connection', (websocket, _request, sessionId) => {
return;
}
getOrCreatePlayback(session).attachFrames(websocket);
if (PLAYBACK_CONNECTION_MODE === 'single' || PLAYBACK_CONNECTION_MODE === 'relay') {
getOrCreatePlayback(session).attachFrames(websocket);
return;
}
streamSplitFrames(websocket, session);
});
setInterval(() => {
@@ -227,7 +240,7 @@ setInterval(() => {
await loadRecentUrls();
server.listen(PORT, () => {
console.log(`Frame stream app listening at http://localhost:${PORT}`);
console.log(`Frame stream app listening at http://localhost:${PORT} mode=${PLAYBACK_CONNECTION_MODE}`);
});
function parseStreamUrl(value) {
@@ -257,6 +270,23 @@ function parsePlaybackOptions(body) {
};
}
function parsePlaybackConnectionMode(value) {
if (value === 'relay' || value === 'tee') {
return 'relay';
}
if (value === 'single' || value === 'unified') {
return 'single';
}
if (value === 'split' || value === 'separate' || value === undefined || value === '') {
return 'split';
}
console.warn(`Unknown PLAYBACK_CONNECTION_MODE "${value}", using split.`);
return 'split';
}
function clampInteger(value, fallback, min, max) {
const parsed = Number(value);
@@ -332,11 +362,555 @@ function getOrCreatePlayback(session) {
return existing;
}
const playback = createPlayback(session);
const playback = PLAYBACK_CONNECTION_MODE === 'relay' ? createRelayPlayback(session) : createPlayback(session);
playbacks.set(session.id, playback);
return playback;
}
function streamSplitAudio(request, response, session) {
const worker = createAudioWorker(session);
const releaseWorker = once(worker.release);
const ffmpeg = spawnFfmpeg(worker.args);
const logger = createFfmpegLogger('audio', session.id, ffmpeg.pid);
const stderrTail = createFfmpegStderrTail();
let stopReason = 'process_exit';
logger.start();
response.set({
'Cache-Control': 'no-store',
'Content-Type': 'audio/mpeg',
'X-Content-Type-Options': 'nosniff',
});
response.flushHeaders();
ffmpeg.stderr.on('data', (chunk) => {
stderrTail.write(chunk);
logger.stderr(chunk);
});
ffmpeg.stdout.on('error', (error) => {
logWarn(`audio output error kind=audio:${shortId(session.id)} error=${oneLine(error.message)}`);
stopReason = 'audio_output_error';
stopProcess(ffmpeg);
});
ffmpeg.stdout.pipe(response);
ffmpeg.on('error', (error) => {
stopReason = 'start_error';
releaseWorker();
logger.error(error);
if (!response.writableEnded) {
response.end();
}
});
ffmpeg.on('close', (code, signal) => {
stderrTail.flush();
releaseWorker();
logger.close(code, signal, stopReason, stderrTail.value);
if (!response.writableEnded) {
response.end();
}
});
const cleanup = once(() => {
stopReason = 'client_disconnect';
stopProcess(ffmpeg);
});
request.on('close', cleanup);
response.on('close', cleanup);
}
function streamSplitFrames(websocket, session) {
const worker = createFrameWorker(session);
const releaseWorker = once(worker.release);
const ffmpeg = spawnFfmpeg(worker.args);
const logger = createFfmpegLogger('frames', session.id, ffmpeg.pid);
const stderrTail = createFfmpegStderrTail();
const { fps, quality, width } = session.options;
const label = `frames:${shortId(session.id)}`;
let frameBuffer = Buffer.alloc(0);
let frameIndex = 0;
let skippedFrames = 0;
let stopReason = 'process_exit';
logger.start();
sendJson(websocket, {
type: 'ready',
codec: 'jpeg',
fps,
quality,
width,
});
ffmpeg.stdout.on('data', handleFrameData);
ffmpeg.stdout.on('error', (error) => {
logWarn(`frame output error kind=${label} error=${oneLine(error.message)}`);
stopReason = 'frame_output_error';
stopProcess(ffmpeg);
});
ffmpeg.stderr.on('data', (chunk) => {
stderrTail.write(chunk);
logger.stderr(chunk);
});
ffmpeg.on('error', (error) => {
stopReason = 'start_error';
releaseWorker();
logger.error(error);
sendJson(websocket, {
type: 'error',
message: `Failed to start ffmpeg: ${error.message}`,
});
websocket.close(1011, 'ffmpeg start failed');
});
ffmpeg.on('close', (code, signal) => {
stderrTail.flush();
releaseWorker();
logger.close(code, signal, stopReason, stderrTail.value);
if (isWebSocketOpen(websocket) && stopReason !== 'client_disconnect') {
sendJson(websocket, {
type: 'end',
code,
signal,
skippedFrames,
message: summarizeFfmpegExit(code, signal, stderrTail.value),
});
websocket.close(1000, 'ffmpeg exited');
}
logInfo(`frames closed kind=${label} reason=${stopReason} frames=${frameIndex} skippedFrames=${skippedFrames}`);
});
websocket.on('close', () => {
stopReason = 'client_disconnect';
stopProcess(ffmpeg);
});
websocket.on('error', () => {
stopReason = 'websocket_error';
stopProcess(ffmpeg);
});
function handleFrameData(chunk) {
frameBuffer = Buffer.concat([frameBuffer, chunk]);
for (;;) {
const start = frameBuffer.indexOf(JPEG_SOI);
if (start === -1) {
frameBuffer = Buffer.alloc(0);
return;
}
const end = frameBuffer.indexOf(JPEG_EOI, start + JPEG_SOI.length);
if (end === -1) {
frameBuffer = start === 0 ? frameBuffer : frameBuffer.subarray(start);
return;
}
const jpeg = frameBuffer.subarray(start, end + JPEG_EOI.length);
frameBuffer = frameBuffer.subarray(end + JPEG_EOI.length);
const timestamp = frameIndex / fps;
frameIndex += 1;
if (!isWebSocketOpen(websocket)) {
return;
}
if (websocket.bufferedAmount > MAX_WS_BUFFER_BYTES) {
skippedFrames += 1;
continue;
}
const packet = Buffer.allocUnsafe(8 + jpeg.length);
packet.writeDoubleLE(timestamp, 0);
jpeg.copy(packet, 8);
websocket.send(packet, { binary: true }, (error) => {
if (error) {
stopReason = 'websocket_send_error';
stopProcess(ffmpeg);
}
});
}
}
}
function createRelayPlayback(session) {
const { fps, quality, width } = session.options;
const label = `relay:${shortId(session.id)}`;
let audioResponse = null;
let websocket = null;
let audioFfmpeg = null;
let frameFfmpeg = null;
let audioInput = null;
let frameInput = null;
let audioLogger = null;
let frameLogger = null;
let sourceController = null;
let sourceBytes = 0;
let frameBuffer = Buffer.alloc(0);
let frameIndex = 0;
let skippedFrames = 0;
let stopReason = 'process_exit';
let started = false;
let stopping = false;
let closed = false;
let sourceEnded = false;
let audioClosed = false;
let frameClosed = false;
let frameExitCode = null;
let frameExitSignal = null;
let frameEndSent = false;
let readyTimer = null;
const audioStderr = createFfmpegStderrTail();
const frameStderr = createFfmpegStderrTail();
const playback = {
get closed() {
return closed;
},
attachAudio(request, response) {
if (closed) {
response.status(410).end();
return;
}
if (audioResponse && !audioResponse.writableEnded) {
response.status(409).json({ error: 'Audio stream already attached for this session.' });
return;
}
audioResponse = response;
response.set({
'Cache-Control': 'no-store',
'Content-Type': 'audio/mpeg',
'X-Content-Type-Options': 'nosniff',
});
response.flushHeaders();
const cleanup = once(() => {
if (!closed) {
stop('client_disconnect');
}
});
request.on('close', cleanup);
response.on('close', cleanup);
maybeStart();
},
attachFrames(nextWebsocket) {
if (closed) {
nextWebsocket.close(1011, 'Playback closed');
return;
}
if (websocket && websocket.readyState === WebSocket.OPEN) {
nextWebsocket.close(1013, 'Frame stream already attached for this session');
return;
}
websocket = nextWebsocket;
sendJson(websocket, {
type: 'ready',
codec: 'jpeg',
fps,
quality,
width,
});
websocket.on('close', () => {
if (!closed) {
stop('client_disconnect');
}
});
websocket.on('error', () => {
if (!closed) {
stop('websocket_error');
}
});
maybeStart();
},
};
readyTimer = setTimeout(() => {
if (!started && !closed) {
stop('consumer_attach_timeout');
}
}, PLAYBACK_READY_TIMEOUT_MS).unref();
return playback;
function maybeStart() {
if (started || closed || !audioResponse || !isWebSocketOpen(websocket)) {
return;
}
started = true;
clearReadyTimer();
audioFfmpeg = spawnFfmpeg(buildRelayAudioArgs(session), ['pipe', 'pipe', 'pipe']);
frameFfmpeg = spawnFfmpeg(buildRelayFrameArgs(session), ['pipe', 'pipe', 'pipe']);
audioLogger = createFfmpegLogger('relay-audio', session.id, audioFfmpeg.pid);
frameLogger = createFfmpegLogger('relay-frames', session.id, frameFfmpeg.pid);
audioInput = createRelayInputBranch(audioFfmpeg.stdin, handleRelayInputError);
frameInput = createRelayInputBranch(frameFfmpeg.stdin, handleRelayInputError);
audioLogger.start();
frameLogger.start();
audioFfmpeg.stdout.on('error', (error) => {
logWarn(`audio output error kind=${label} error=${oneLine(error.message)}`);
stop('audio_output_error');
});
audioFfmpeg.stdout.pipe(audioResponse);
frameFfmpeg.stdout.on('error', (error) => {
logWarn(`frame output error kind=${label} error=${oneLine(error.message)}`);
stop('frame_output_error');
});
frameFfmpeg.stdout.on('data', handleFrameData);
audioFfmpeg.stderr.on('data', (chunk) => {
audioStderr.write(chunk);
audioLogger.stderr(chunk);
});
frameFfmpeg.stderr.on('data', (chunk) => {
frameStderr.write(chunk);
frameLogger.stderr(chunk);
});
audioFfmpeg.on('error', (error) => {
audioLogger.error(error);
stop('audio_start_error');
});
frameFfmpeg.on('error', (error) => {
frameLogger.error(error);
stop('frame_start_error');
});
audioFfmpeg.on('close', (code, signal) => {
audioClosed = true;
audioStderr.flush();
audioLogger.close(code, signal, stopReason, audioStderr.value);
maybeFinishRelay();
});
frameFfmpeg.on('close', (code, signal) => {
frameClosed = true;
frameExitCode = code;
frameExitSignal = signal;
frameStderr.flush();
frameLogger.close(code, signal, stopReason, frameStderr.value);
sendFrameEndIfNeeded();
maybeFinishRelay();
});
void startRelaySource();
}
async function startRelaySource() {
const startedAt = Date.now();
let upstreamStatus = 'pending';
let upstreamEnded = false;
sourceController = new AbortController();
try {
const headers = new Headers();
headers.set('accept-encoding', 'identity');
const upstream = await fetch(session.url, {
headers,
redirect: 'follow',
signal: sourceController.signal,
});
upstreamStatus = String(upstream.status);
logInfo(`relay source connected kind=${label} status=${upstream.status} contentType=${upstream.headers.get('content-type') ?? 'unknown'}`);
if (!upstream.ok || !upstream.body) {
throw new Error(`Source returned HTTP ${upstream.status}`);
}
for await (const chunk of Readable.fromWeb(upstream.body)) {
if (closed || stopping) {
return;
}
const buffer = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk);
sourceBytes += buffer.length;
if (!audioInput.write(buffer) || !frameInput.write(buffer)) {
stop('relay_input_queue_overflow');
return;
}
await waitForRelayCapacity([audioInput, frameInput], () => closed || stopping);
}
upstreamEnded = true;
sourceEnded = true;
audioInput.end();
frameInput.end();
} catch (error) {
if (sourceController.signal.aborted || closed) {
return;
}
logWarn(`relay source failed kind=${label} error=${oneLine(error.message)}`);
stop('relay_source_error');
} finally {
logInfo(`relay source closed kind=${label} status=${upstreamStatus} bytes=${sourceBytes} upstreamEnded=${upstreamEnded} durationMs=${Date.now() - startedAt}`);
}
}
function handleRelayInputError(error) {
if (!closed && !stopping) {
logWarn(`relay input error kind=${label} error=${oneLine(error.message)}`);
stop('relay_input_error');
}
}
function handleFrameData(chunk) {
frameBuffer = Buffer.concat([frameBuffer, chunk]);
for (;;) {
const start = frameBuffer.indexOf(JPEG_SOI);
if (start === -1) {
frameBuffer = Buffer.alloc(0);
return;
}
const end = frameBuffer.indexOf(JPEG_EOI, start + JPEG_SOI.length);
if (end === -1) {
frameBuffer = start === 0 ? frameBuffer : frameBuffer.subarray(start);
return;
}
const jpeg = frameBuffer.subarray(start, end + JPEG_EOI.length);
frameBuffer = frameBuffer.subarray(end + JPEG_EOI.length);
const timestamp = frameIndex / fps;
frameIndex += 1;
if (!isWebSocketOpen(websocket)) {
return;
}
if (websocket.bufferedAmount > MAX_WS_BUFFER_BYTES) {
skippedFrames += 1;
continue;
}
const packet = Buffer.allocUnsafe(8 + jpeg.length);
packet.writeDoubleLE(timestamp, 0);
jpeg.copy(packet, 8);
websocket.send(packet, { binary: true }, (error) => {
if (error && !closed) {
stop('websocket_send_error');
}
});
}
}
function stop(reason) {
if (closed) {
return;
}
stopReason = reason;
stopping = true;
sourceController?.abort();
audioInput?.destroy();
frameInput?.destroy();
if (isChildRunning(audioFfmpeg)) {
stopProcess(audioFfmpeg);
}
if (isChildRunning(frameFfmpeg)) {
stopProcess(frameFfmpeg);
}
if (!isChildRunning(audioFfmpeg) && !isChildRunning(frameFfmpeg)) {
finishRelay(1000, 'relay stopped');
}
}
function maybeFinishRelay() {
if (closed) {
return;
}
if (!stopping && !sourceEnded) {
stop('relay_process_exit');
return;
}
if (audioClosed && frameClosed) {
finishRelay(1000, 'relay ended');
}
}
function sendFrameEndIfNeeded() {
if (frameEndSent || !isWebSocketOpen(websocket)) {
return;
}
frameEndSent = true;
sendJson(websocket, {
type: 'end',
code: frameExitCode,
signal: frameExitSignal,
skippedFrames,
message: summarizeFfmpegExit(frameExitCode, frameExitSignal, frameStderr.value),
});
websocket.close(1000, 'ffmpeg exited');
}
function finishRelay(websocketCode, websocketReason) {
if (closed) {
return;
}
closed = true;
clearReadyTimer();
sourceController?.abort();
audioInput?.destroy();
frameInput?.destroy();
playbacks.delete(session.id);
if (audioResponse && !audioResponse.writableEnded) {
audioResponse.end();
}
if (isWebSocketOpen(websocket) && frameEndSent) {
websocket.close(websocketCode, websocketReason);
} else {
sendFrameEndIfNeeded();
}
logInfo(`relay closed kind=${label} reason=${stopReason} sourceBytes=${sourceBytes} frames=${frameIndex} skippedFrames=${skippedFrames} audioInputPeakBytes=${audioInput?.peakBytes ?? 0} frameInputPeakBytes=${frameInput?.peakBytes ?? 0}`);
}
function clearReadyTimer() {
if (readyTimer) {
clearTimeout(readyTimer);
readyTimer = null;
}
}
}
function createPlayback(session) {
const { fps, quality, width } = session.options;
const label = `playback:${shortId(session.id)}`;
@@ -359,11 +933,7 @@ function createPlayback(session) {
let started = false;
let closed = false;
let readyTimer = null;
const stderrTailLogger = createLineLogger((line) => {
if (!shouldSuppressFfmpegLogLine(line)) {
stderr = appendTail(stderr, `${line}\n`);
}
});
const stderrTail = createFfmpegStderrTail();
const playback = {
get closed() {
@@ -471,7 +1041,7 @@ function createPlayback(session) {
frameStream.on('data', handleFrameData);
ffmpeg.stderr.on('data', (chunk) => {
stderrTailLogger.write(chunk);
stderrTail.write(chunk);
logger.stderr(chunk);
});
@@ -482,7 +1052,8 @@ function createPlayback(session) {
});
ffmpeg.on('close', (code, signal) => {
stderrTailLogger.flush();
stderrTail.flush();
stderr = stderrTail.value;
logger.close(code, signal, stopReason, stderr);
finishPlayback(1000, 'ffmpeg exited', code, signal);
});
@@ -662,21 +1233,35 @@ function getListeningPort() {
return typeof address === 'object' && address ? address.port : PORT;
}
function buildPlaybackArgs(session, inputUrl) {
const { fps, quality, width } = session.options;
const videoFilter = `fps=${fps},scale=w='min(${width},iw)':h=-2:flags=bicubic:out_range=pc,format=yuvj420p`;
function createAudioWorker(session) {
const source = createSourceInput(session.id, 'audio');
return {
args: buildAudioArgs(session, source.url),
release: source.release,
};
}
function createFrameWorker(session) {
const source = createSourceInput(session.id, 'frames');
return {
args: buildFrameArgs(session, source.url),
release: source.release,
};
}
function buildRelayAudioArgs(session) {
return buildAudioArgs(session, 'pipe:0', { seekable: false });
}
function buildRelayFrameArgs(session) {
return buildFrameArgs(session, 'pipe:0', { seekable: false });
}
function buildPlaybackArgs(session, inputUrl) {
return [
'-hide_banner',
'-nostdin',
'-loglevel',
FFMPEG_LOG_LEVEL,
'-nostats',
'-seekable',
FFMPEG_INPUT_SEEKABLE,
'-re',
'-i',
inputUrl,
...buildInputArgs(inputUrl),
'-map',
'0:a:0?',
'-vn',
@@ -693,6 +1278,61 @@ function buildPlaybackArgs(session, inputUrl) {
'pipe:1',
'-map',
'0:v:0',
...buildFrameOutputArgs(session, 'pipe:3'),
];
}
function buildAudioArgs(session, inputUrl, inputOptions) {
return [
...buildInputArgs(inputUrl, inputOptions),
'-map',
'0:a:0?',
'-vn',
'-ac',
'2',
'-ar',
'48000',
'-codec:a',
'libmp3lame',
'-b:a',
session.options.audioBitrate,
'-f',
'mp3',
'pipe:1',
];
}
function buildFrameArgs(session, inputUrl, inputOptions) {
return [
...buildInputArgs(inputUrl, inputOptions),
'-map',
'0:v:0',
...buildFrameOutputArgs(session, 'pipe:1'),
];
}
function buildInputArgs(inputUrl, { seekable = true } = {}) {
const args = [
'-hide_banner',
'-nostdin',
'-loglevel',
FFMPEG_LOG_LEVEL,
'-nostats',
];
if (seekable) {
args.push('-seekable', FFMPEG_INPUT_SEEKABLE);
}
args.push('-re', '-i', inputUrl);
return args;
}
function buildFrameOutputArgs(session, outputUrl) {
const { fps, quality, width } = session.options;
const videoFilter = `fps=${fps},scale=w='min(${width},iw)':h=-2:flags=bicubic:out_range=pc,format=yuvj420p`;
return [
'-an',
'-vf',
videoFilter,
@@ -706,7 +1346,7 @@ function buildPlaybackArgs(session, inputUrl) {
String(quality),
'-f',
'image2pipe',
'pipe:3',
outputUrl,
];
}
@@ -752,6 +1392,144 @@ function shouldSuppressFfmpegLogLine(line) {
return line.includes('deprecated pixel format used, make sure you did set range correctly');
}
function createFfmpegStderrTail() {
let value = '';
const lineLogger = createLineLogger((line) => {
if (!shouldSuppressFfmpegLogLine(line)) {
value = appendTail(value, `${line}\n`);
}
});
return {
get value() {
return value;
},
write(chunk) {
lineLogger.write(chunk);
},
flush() {
lineLogger.flush();
},
};
}
function createRelayInputBranch(stream, onError) {
let accepting = true;
let ending = false;
let drainPending = false;
let queue = [];
let queueBytes = 0;
let peakBytes = 0;
let waiters = [];
stream.on('drain', () => {
drainPending = false;
flush();
});
stream.on('error', (error) => {
accepting = false;
notifyWaiters();
onError(error);
});
return {
get queueBytes() {
return queueBytes;
},
get peakBytes() {
return peakBytes;
},
write(chunk) {
if (!accepting || stream.destroyed) {
return false;
}
if (queue.length === 0 && !drainPending) {
return writeNow(chunk);
}
queue.push(chunk);
queueBytes += chunk.length;
peakBytes = Math.max(peakBytes, queueBytes);
notifyWaiters();
return queueBytes <= MAX_RELAY_BRANCH_QUEUE_BYTES;
},
end() {
accepting = false;
ending = true;
flush();
},
destroy() {
accepting = false;
ending = false;
queue = [];
queueBytes = 0;
notifyWaiters();
if (!stream.destroyed) {
stream.destroy();
}
},
waitForCapacity() {
if (queueBytes <= RELAY_BRANCH_PAUSE_BYTES || !accepting || stream.destroyed) {
return Promise.resolve();
}
return new Promise((resolve) => {
waiters.push(resolve);
});
},
};
function writeNow(chunk) {
try {
drainPending = !stream.write(chunk);
return true;
} catch (error) {
accepting = false;
notifyWaiters();
onError(error);
return false;
}
}
function flush() {
while (queue.length > 0 && !drainPending && !stream.destroyed) {
const chunk = queue.shift();
queueBytes -= chunk.length;
notifyWaiters();
if (!writeNow(chunk)) {
return;
}
}
if (ending && queue.length === 0 && !drainPending && !stream.destroyed) {
stream.end();
}
}
function notifyWaiters() {
if (queueBytes > RELAY_BRANCH_PAUSE_BYTES && accepting && !stream.destroyed) {
return;
}
const currentWaiters = waiters;
waiters = [];
for (const resolve of currentWaiters) {
resolve();
}
}
}
async function waitForRelayCapacity(branches, shouldStop) {
while (!shouldStop() && branches.some((branch) => branch.queueBytes > RELAY_BRANCH_PAUSE_BYTES)) {
await Promise.race(branches.map((branch) => branch.waitForCapacity()));
}
}
function createLineLogger(callback) {
let buffer = '';
@@ -851,6 +1629,10 @@ function redactSecrets(text) {
return text.replace(/([?&](?:api_key|apikey|access_token|token|key)=)[^&\s]+/gi, '$1[redacted]');
}
function isChildRunning(child) {
return Boolean(child && child.exitCode === null && child.signalCode === null);
}
function stopProcess(child) {
if (!child || child.exitCode !== null || child.signalCode !== null) {
return;