big backend refactor

This commit is contained in:
2026-06-13 12:02:22 -07:00
parent 7436544a69
commit 297b053a91
15 changed files with 1768 additions and 1068 deletions

View File

@@ -1,15 +1,10 @@
import { performance } from "node:perf_hooks";
import { prisma } from "../db.js";
import { anthropicClient, hermesAgentClient, openaiClient, xaiClient } from "./providers.js";
import {
buildToolLogMessageData,
normalizeEnabledChatTools,
runPlainChatCompletionsStream,
runToolAwareChatCompletionsStream,
runToolAwareOpenAIChatStream,
type ToolExecutionEvent,
} from "./chat-tools.js";
import { buildAnthropicConversationMessage, getAnthropicSystemPrompt } from "./message-content.js";
import { getProviderChatAdapter } from "./provider-adapters.js";
import { toPrismaProvider } from "./provider-ids.js";
import type { MultiplexRequest, Provider } from "./types.js";
@@ -75,119 +70,48 @@ export async function* runMultiplexStream(req: MultiplexRequest): AsyncGenerator
let raw: unknown = { streamed: true };
try {
if (req.provider === "openai" || req.provider === "xai" || req.provider === "hermes-agent") {
const client = req.provider === "openai" ? openaiClient() : req.provider === "xai" ? xaiClient() : hermesAgentClient();
const enabledTools = normalizeEnabledChatTools(req.enabledTools);
const streamEvents =
req.provider === "openai" && enabledTools.length > 0
? runToolAwareOpenAIChatStream({
client,
model: req.model,
messages: req.messages,
enabledTools,
userLocation: req.userLocation,
temperature: req.temperature,
maxTokens: req.maxTokens,
logContext: {
provider: req.provider,
model: req.model,
chatId: chatId ?? undefined,
},
})
: req.provider === "hermes-agent" || enabledTools.length === 0
? runPlainChatCompletionsStream({
client,
model: req.model,
messages: req.messages,
userLocation: req.userLocation,
temperature: req.temperature,
maxTokens: req.maxTokens,
logContext: {
provider: req.provider,
model: req.model,
chatId: chatId ?? undefined,
},
})
: runToolAwareChatCompletionsStream({
client,
model: req.model,
messages: req.messages,
enabledTools,
userLocation: req.userLocation,
temperature: req.temperature,
maxTokens: req.maxTokens,
logContext: {
provider: req.provider,
model: req.model,
chatId: chatId ?? undefined,
},
});
for await (const ev of streamEvents) {
if (ev.type === "delta") {
text += ev.text;
yield { type: "delta", text: ev.text };
continue;
}
if (ev.type === "tool_call") {
if (ev.event.status !== "initiated" && shouldPersist && chatId) {
const toolMessage = buildToolLogMessageData(chatId, ev.event);
await prisma.message.create({
data: {
chatId: toolMessage.chatId,
role: toolMessage.role as any,
content: toolMessage.content,
name: toolMessage.name,
metadata: toolMessage.metadata as any,
},
});
}
yield { type: "tool_call", event: ev.event };
continue;
}
raw = ev.result.raw;
usage = ev.result.usage;
text = ev.result.text;
}
} else if (req.provider === "anthropic") {
const client = anthropicClient();
const system = getAnthropicSystemPrompt(req.messages, req.userLocation);
const msgs = req.messages.filter((message) => message.role !== "system").map((message) => buildAnthropicConversationMessage(message));
const stream = await client.messages.create({
const adapter = getProviderChatAdapter(req.provider);
const streamEvents = adapter.stream({
model: req.model,
messages: req.messages,
enabledTools: req.enabledTools,
userLocation: req.userLocation,
temperature: req.temperature,
maxTokens: req.maxTokens,
logContext: {
provider: req.provider,
model: req.model,
system,
max_tokens: req.maxTokens ?? 1024,
temperature: req.temperature,
messages: msgs as any,
stream: true,
});
chatId: chatId ?? undefined,
},
});
for await (const ev of stream as any as AsyncIterable<any>) {
// Anthropic streaming events include content_block_delta with text_delta
if (ev?.type === "content_block_delta" && ev?.delta?.type === "text_delta") {
const delta = ev.delta.text ?? "";
if (delta) {
text += delta;
yield { type: "delta", text: delta };
}
}
// capture usage if present on message_delta
if (ev?.type === "message_delta" && ev?.usage) {
usage = {
inputTokens: ev.usage.input_tokens,
outputTokens: ev.usage.output_tokens,
totalTokens:
(ev.usage.input_tokens ?? 0) + (ev.usage.output_tokens ?? 0),
};
}
// some streams end with message_stop
for await (const ev of streamEvents) {
if (ev.type === "delta") {
text += ev.text;
yield { type: "delta", text: ev.text };
continue;
}
raw = { streamed: true, provider: "anthropic" };
} else {
throw new Error(`unknown provider: ${req.provider}`);
if (ev.type === "tool_call") {
if (ev.event.status !== "initiated" && shouldPersist && chatId) {
const toolMessage = buildToolLogMessageData(chatId, ev.event);
await prisma.message.create({
data: {
chatId: toolMessage.chatId,
role: toolMessage.role as any,
content: toolMessage.content,
name: toolMessage.name,
metadata: toolMessage.metadata as any,
},
});
}
yield { type: "tool_call", event: ev.event };
continue;
}
raw = ev.result.raw;
usage = ev.result.usage;
text = ev.result.text;
}
const latencyMs = Math.round(performance.now() - t0);