1117 lines
38 KiB
TypeScript
1117 lines
38 KiB
TypeScript
import { performance } from "node:perf_hooks";
|
|
import { z } from "zod";
|
|
import type { FastifyInstance, FastifyReply, FastifyRequest } from "fastify";
|
|
import { ActiveSseStream, type SseStreamEvent } from "./active-streams.js";
|
|
import { prisma } from "./db.js";
|
|
import { requireAdmin } from "./auth.js";
|
|
import { env } from "./env.js";
|
|
import { buildComparableAttachments } from "./llm/message-content.js";
|
|
import { runMultiplex } from "./llm/multiplexer.js";
|
|
import { runMultiplexStream, type StreamEvent } from "./llm/streaming.js";
|
|
import { getModelCatalogSnapshot } from "./llm/model-catalog.js";
|
|
import { openaiClient } from "./llm/providers.js";
|
|
import { serializeProviderFields, toPrismaProvider } from "./llm/provider-ids.js";
|
|
import { exaClient } from "./search/exa.js";
|
|
import type { ChatAttachment } from "./llm/types.js";
|
|
|
|
const ProviderSchema = z.enum(["openai", "anthropic", "xai", "hermes-agent"]);
|
|
|
|
type IncomingChatMessage = {
|
|
role: "system" | "user" | "assistant" | "tool";
|
|
content: string;
|
|
name?: string;
|
|
attachments?: ChatAttachment[];
|
|
};
|
|
|
|
function sameMessage(
|
|
a: { role: string; content: string; name?: string | null; metadata?: unknown },
|
|
b: { role: string; content: string; name?: string | null; attachments?: ChatAttachment[] }
|
|
) {
|
|
const existingAttachments = JSON.stringify(buildComparableAttachments((a.metadata as Record<string, unknown> | null)?.attachments ?? null));
|
|
const incomingAttachments = JSON.stringify(b.attachments ?? []);
|
|
return (
|
|
a.role === b.role &&
|
|
a.content === b.content &&
|
|
(a.name ?? null) === (b.name ?? null) &&
|
|
existingAttachments === incomingAttachments
|
|
);
|
|
}
|
|
|
|
function isToolCallLogMetadata(value: unknown) {
|
|
if (!value || typeof value !== "object" || Array.isArray(value)) return false;
|
|
const record = value as Record<string, unknown>;
|
|
return record.kind === "tool_call";
|
|
}
|
|
|
|
function isToolCallLogMessage(message: { role: string; metadata: unknown }) {
|
|
return message.role === "tool" && isToolCallLogMetadata(message.metadata);
|
|
}
|
|
|
|
async function storeNonAssistantMessages(chatId: string, messages: IncomingChatMessage[]) {
|
|
const incoming = messages.filter((m) => m.role !== "assistant");
|
|
if (!incoming.length) return;
|
|
|
|
const existing = await prisma.message.findMany({
|
|
where: { chatId },
|
|
orderBy: { createdAt: "asc" },
|
|
select: { role: true, content: true, name: true, metadata: true },
|
|
});
|
|
const existingNonAssistant = existing.filter((m) => m.role !== "assistant" && !isToolCallLogMessage(m));
|
|
|
|
let sharedPrefix = 0;
|
|
const max = Math.min(existingNonAssistant.length, incoming.length);
|
|
while (sharedPrefix < max && sameMessage(existingNonAssistant[sharedPrefix], incoming[sharedPrefix])) {
|
|
sharedPrefix += 1;
|
|
}
|
|
|
|
if (sharedPrefix === incoming.length) return;
|
|
const toInsert = sharedPrefix === existingNonAssistant.length ? incoming.slice(existingNonAssistant.length) : incoming;
|
|
if (!toInsert.length) return;
|
|
|
|
await prisma.message.createMany({
|
|
data: toInsert.map((m) => ({
|
|
chatId,
|
|
role: m.role as any,
|
|
content: m.content,
|
|
name: m.name,
|
|
metadata: m.attachments?.length ? ({ attachments: m.attachments } as any) : undefined,
|
|
})),
|
|
});
|
|
}
|
|
|
|
const MAX_CHAT_ATTACHMENTS = 8;
|
|
const MAX_IMAGE_ATTACHMENT_BYTES = 6 * 1024 * 1024;
|
|
const MAX_TEXT_ATTACHMENT_CHARS = 200_000;
|
|
const MAX_IMAGE_DATA_URL_CHARS = 8_500_000;
|
|
|
|
const ChatAttachmentSchema = z.discriminatedUnion("kind", [
|
|
z.object({
|
|
kind: z.literal("image"),
|
|
id: z.string().trim().min(1).max(128),
|
|
filename: z.string().trim().min(1).max(255),
|
|
mimeType: z.enum(["image/png", "image/jpeg"]),
|
|
sizeBytes: z.number().int().positive().max(MAX_IMAGE_ATTACHMENT_BYTES),
|
|
dataUrl: z
|
|
.string()
|
|
.max(MAX_IMAGE_DATA_URL_CHARS)
|
|
.regex(/^data:image\/(?:png|jpeg);base64,[a-z0-9+/=\s]+$/i, "Invalid image data URL"),
|
|
}),
|
|
z.object({
|
|
kind: z.literal("text"),
|
|
id: z.string().trim().min(1).max(128),
|
|
filename: z.string().trim().min(1).max(255),
|
|
mimeType: z.string().trim().min(1).max(127),
|
|
sizeBytes: z.number().int().positive().max(8 * 1024 * 1024),
|
|
text: z.string().max(MAX_TEXT_ATTACHMENT_CHARS),
|
|
truncated: z.boolean().optional(),
|
|
}),
|
|
]);
|
|
|
|
const CompletionMessageSchema = z
|
|
.object({
|
|
role: z.enum(["system", "user", "assistant", "tool"]),
|
|
content: z.string(),
|
|
name: z.string().optional(),
|
|
attachments: z.array(ChatAttachmentSchema).max(MAX_CHAT_ATTACHMENTS).optional(),
|
|
})
|
|
.superRefine((value, ctx) => {
|
|
if (value.attachments?.length && value.role === "tool") {
|
|
ctx.addIssue({
|
|
code: z.ZodIssueCode.custom,
|
|
message: "Tool messages cannot include attachments.",
|
|
path: ["attachments"],
|
|
});
|
|
}
|
|
});
|
|
|
|
const CompletionStreamBody = z
|
|
.object({
|
|
chatId: z.string().optional(),
|
|
persist: z.boolean().optional(),
|
|
provider: ProviderSchema,
|
|
model: z.string().min(1),
|
|
messages: z.array(CompletionMessageSchema),
|
|
temperature: z.number().min(0).max(2).optional(),
|
|
maxTokens: z.number().int().positive().optional(),
|
|
})
|
|
.superRefine((value, ctx) => {
|
|
if (value.persist === false && value.chatId) {
|
|
ctx.addIssue({
|
|
code: z.ZodIssueCode.custom,
|
|
message: "chatId must be omitted when persist is false",
|
|
path: ["chatId"],
|
|
});
|
|
}
|
|
});
|
|
|
|
function mergeAttachmentsIntoMetadata(metadata: unknown, attachments?: ChatAttachment[]) {
|
|
if (!attachments?.length) return metadata as any;
|
|
if (!metadata || typeof metadata !== "object" || Array.isArray(metadata)) {
|
|
return { attachments };
|
|
}
|
|
return {
|
|
...(metadata as Record<string, unknown>),
|
|
attachments,
|
|
};
|
|
}
|
|
|
|
const SearchRunBody = z.object({
|
|
query: z.string().trim().min(1).optional(),
|
|
title: z.string().trim().min(1).optional(),
|
|
type: z.enum(["auto", "fast", "deep", "instant"]).optional(),
|
|
numResults: z.number().int().min(1).max(25).optional(),
|
|
includeDomains: z.array(z.string().trim().min(1)).max(50).optional(),
|
|
excludeDomains: z.array(z.string().trim().min(1)).max(50).optional(),
|
|
});
|
|
|
|
function mapSearchResultRow(searchId: string, result: any, index: number) {
|
|
return {
|
|
searchId,
|
|
rank: index,
|
|
title: result.title ?? null,
|
|
url: result.url,
|
|
publishedDate: result.publishedDate ?? null,
|
|
author: result.author ?? null,
|
|
text: result.text ?? null,
|
|
highlights: Array.isArray(result.highlights) ? (result.highlights as any) : null,
|
|
highlightScores: Array.isArray(result.highlightScores) ? (result.highlightScores as any) : null,
|
|
score: typeof result.score === "number" ? result.score : null,
|
|
favicon: result.favicon ?? null,
|
|
image: result.image ?? null,
|
|
};
|
|
}
|
|
|
|
function mapSearchResultPreview(result: any, index: number) {
|
|
return {
|
|
id: `preview-result-${index}`,
|
|
createdAt: new Date().toISOString(),
|
|
rank: index,
|
|
title: result.title ?? null,
|
|
url: result.url,
|
|
publishedDate: result.publishedDate ?? null,
|
|
author: result.author ?? null,
|
|
text: result.text ?? null,
|
|
highlights: Array.isArray(result.highlights) ? (result.highlights as any) : null,
|
|
highlightScores: Array.isArray(result.highlightScores) ? (result.highlightScores as any) : null,
|
|
score: typeof result.score === "number" ? result.score : null,
|
|
favicon: result.favicon ?? null,
|
|
image: result.image ?? null,
|
|
};
|
|
}
|
|
|
|
function truncateContextPart(value: string | null | undefined, maxLength: number) {
|
|
const trimmed = value?.trim();
|
|
if (!trimmed) return null;
|
|
if (trimmed.length <= maxLength) return trimmed;
|
|
return `${trimmed.slice(0, maxLength - 1).trimEnd()}...`;
|
|
}
|
|
|
|
function parseAnswerText(answerResponse: any) {
|
|
if (typeof answerResponse?.answer === "string") return answerResponse.answer;
|
|
if (answerResponse?.answer) return JSON.stringify(answerResponse.answer, null, 2);
|
|
return null;
|
|
}
|
|
|
|
function normalizeSuggestedTitle(raw: string, fallback: string) {
|
|
const oneLine = raw
|
|
.replace(/\r?\n+/g, " ")
|
|
.replace(/^['"`\s]+|['"`\s]+$/g, "")
|
|
.replace(/\s+/g, " ")
|
|
.trim();
|
|
const fromRaw = oneLine || fallback;
|
|
const words = fromRaw.split(/\s+/).filter(Boolean);
|
|
return words.slice(0, 4).join(" ").slice(0, 64).trim() || fallback;
|
|
}
|
|
|
|
async function generateChatTitle(content: string) {
|
|
const systemPrompt =
|
|
"You create short chat titles. Return exactly one line, maximum 4 words, no quotes, no trailing punctuation.";
|
|
const userPrompt = `User request:\n${content}\n\nTitle:`;
|
|
const response = await openaiClient().responses.create({
|
|
model: "gpt-4.1-mini",
|
|
temperature: 0,
|
|
max_output_tokens: 20,
|
|
instructions: systemPrompt,
|
|
input: userPrompt,
|
|
store: false,
|
|
});
|
|
return response.output_text ?? "";
|
|
}
|
|
|
|
function normalizeUrlForMatch(input: string | null | undefined) {
|
|
if (!input) return "";
|
|
try {
|
|
const parsed = new URL(input);
|
|
parsed.hash = "";
|
|
const normalized = parsed.toString();
|
|
return normalized.endsWith("/") ? normalized.slice(0, -1) : normalized;
|
|
} catch {
|
|
return input.trim().replace(/\/$/, "");
|
|
}
|
|
}
|
|
|
|
function buildSearchChatContext(search: any) {
|
|
const query = truncateContextPart(search.query, 500) ?? truncateContextPart(search.title, 500) ?? "Untitled search";
|
|
const lines: string[] = [
|
|
"You are Sybil. The user started this chat from a saved web search. Use the search answer and result context below when answering follow-up questions. If the context is insufficient, say so and use available tools when appropriate.",
|
|
"",
|
|
`Search query: ${query}`,
|
|
];
|
|
|
|
const answer = truncateContextPart(search.answerText, 6000);
|
|
if (answer) {
|
|
lines.push("", "Search answer:", answer);
|
|
}
|
|
|
|
if (Array.isArray(search.answerCitations) && search.answerCitations.length) {
|
|
lines.push("", "Answer citations:");
|
|
for (const [index, citation] of search.answerCitations.slice(0, 8).entries()) {
|
|
const title = truncateContextPart(citation?.title, 160);
|
|
const url = truncateContextPart(citation?.url ?? citation?.id, 400);
|
|
if (title || url) {
|
|
lines.push(`${index + 1}. ${[title, url].filter(Boolean).join(" - ")}`);
|
|
}
|
|
}
|
|
}
|
|
|
|
if (Array.isArray(search.results) && search.results.length) {
|
|
lines.push("", "Search results:");
|
|
for (const result of search.results.slice(0, 10)) {
|
|
const title = truncateContextPart(result.title, 180) ?? result.url;
|
|
const url = truncateContextPart(result.url, 500);
|
|
const published = truncateContextPart(result.publishedDate, 80);
|
|
const author = truncateContextPart(result.author, 120);
|
|
const text = truncateContextPart(result.text, 1000);
|
|
const highlights = Array.isArray(result.highlights)
|
|
? result.highlights
|
|
.map((highlight: unknown) => truncateContextPart(typeof highlight === "string" ? highlight : null, 360))
|
|
.filter(Boolean)
|
|
: [];
|
|
|
|
lines.push(`${result.rank + 1}. ${title}`);
|
|
if (url) lines.push(` URL: ${url}`);
|
|
if (published || author) lines.push(` Source detail: ${[published, author].filter(Boolean).join(" - ")}`);
|
|
if (text) lines.push(` Text: ${text}`);
|
|
for (const highlight of highlights.slice(0, 2)) {
|
|
lines.push(` Highlight: ${highlight}`);
|
|
}
|
|
}
|
|
}
|
|
|
|
return lines.join("\n");
|
|
}
|
|
|
|
function buildSseHeaders(originHeader: string | undefined) {
|
|
const origin = originHeader && originHeader !== "null" ? originHeader : "*";
|
|
const headers: Record<string, string> = {
|
|
"Content-Type": "text/event-stream; charset=utf-8",
|
|
"Cache-Control": "no-cache, no-transform",
|
|
Connection: "keep-alive",
|
|
"X-Accel-Buffering": "no",
|
|
"Access-Control-Allow-Origin": origin,
|
|
Vary: "Origin",
|
|
};
|
|
if (origin !== "*") {
|
|
headers["Access-Control-Allow-Credentials"] = "true";
|
|
}
|
|
return headers;
|
|
}
|
|
|
|
type SearchRunRequest = z.infer<typeof SearchRunBody>;
|
|
|
|
const activeChatStreams = new Map<string, ActiveSseStream>();
|
|
const activeSearchStreams = new Map<string, ActiveSseStream>();
|
|
|
|
function getErrorMessage(err: unknown) {
|
|
return err instanceof Error ? err.message : String(err);
|
|
}
|
|
|
|
function writeSseEvent(reply: FastifyReply, event: SseStreamEvent) {
|
|
if (reply.raw.destroyed || reply.raw.writableEnded) return;
|
|
reply.raw.write(`event: ${event.event}\n`);
|
|
reply.raw.write(`data: ${JSON.stringify(event.data)}\n\n`);
|
|
}
|
|
|
|
async function streamActiveRun(req: FastifyRequest, reply: FastifyReply, stream: ActiveSseStream) {
|
|
reply.raw.writeHead(200, buildSseHeaders(typeof req.headers.origin === "string" ? req.headers.origin : undefined));
|
|
reply.raw.flushHeaders?.();
|
|
|
|
let unsubscribe = () => {};
|
|
let closed = false;
|
|
const closedPromise = new Promise<void>((resolve) => {
|
|
const onClose = () => {
|
|
closed = true;
|
|
unsubscribe();
|
|
reply.raw.off("close", onClose);
|
|
resolve();
|
|
};
|
|
reply.raw.on("close", onClose);
|
|
stream.done.finally(() => {
|
|
reply.raw.off("close", onClose);
|
|
});
|
|
});
|
|
|
|
unsubscribe = stream.subscribe((event) => writeSseEvent(reply, event));
|
|
await Promise.race([stream.done, closedPromise]);
|
|
unsubscribe();
|
|
|
|
if (!closed && !reply.raw.destroyed && !reply.raw.writableEnded) {
|
|
reply.raw.end();
|
|
}
|
|
|
|
return reply;
|
|
}
|
|
|
|
function mapChatStreamEvent(ev: StreamEvent): SseStreamEvent {
|
|
if (ev.type === "tool_call") return { event: "tool_call", data: ev.event };
|
|
return { event: ev.type, data: ev };
|
|
}
|
|
|
|
function startActiveChatStream(chatId: string, body: z.infer<typeof CompletionStreamBody>) {
|
|
const stream = new ActiveSseStream();
|
|
activeChatStreams.set(chatId, stream);
|
|
|
|
void (async () => {
|
|
let sawTerminalEvent = false;
|
|
try {
|
|
for await (const ev of runMultiplexStream(body)) {
|
|
const event = mapChatStreamEvent(ev);
|
|
if (ev.type === "done" || ev.type === "error") {
|
|
sawTerminalEvent = true;
|
|
stream.complete(event);
|
|
break;
|
|
}
|
|
stream.emit(event.event, event.data);
|
|
}
|
|
|
|
if (!sawTerminalEvent) {
|
|
stream.complete({ event: "error", data: { message: "chat stream ended unexpectedly" } });
|
|
}
|
|
} catch (err) {
|
|
stream.complete({ event: "error", data: { message: getErrorMessage(err) } });
|
|
} finally {
|
|
activeChatStreams.delete(chatId);
|
|
}
|
|
})();
|
|
|
|
return stream;
|
|
}
|
|
|
|
async function executeSearchRunStream(searchId: string, body: SearchRunRequest, stream: ActiveSseStream) {
|
|
const startedAt = performance.now();
|
|
const query = body.query?.trim();
|
|
if (!query) {
|
|
stream.complete({ event: "error", data: { message: "query is required" } });
|
|
return;
|
|
}
|
|
|
|
const normalizedTitle = body.title?.trim() || query.slice(0, 80);
|
|
|
|
try {
|
|
const exa = exaClient();
|
|
const searchPromise = exa.search(query, {
|
|
type: body.type ?? "auto",
|
|
numResults: body.numResults ?? 10,
|
|
includeDomains: body.includeDomains,
|
|
excludeDomains: body.excludeDomains,
|
|
moderation: true,
|
|
userLocation: "US",
|
|
contents: false,
|
|
} as any);
|
|
const answerPromise = exa.answer(query, {
|
|
text: true,
|
|
model: "exa",
|
|
userLocation: "US",
|
|
});
|
|
|
|
let searchResponse: any | null = null;
|
|
let answerResponse: any | null = null;
|
|
let enrichedResults: any[] | null = null;
|
|
let searchError: string | null = null;
|
|
let answerError: string | null = null;
|
|
|
|
const searchSettled = searchPromise.then(
|
|
async (value) => {
|
|
searchResponse = value;
|
|
const previewResults = (value?.results ?? []).map((result: any, index: number) => mapSearchResultPreview(result, index));
|
|
stream.emit("search_results", {
|
|
requestId: value?.requestId ?? null,
|
|
results: previewResults,
|
|
});
|
|
|
|
const urls = (value?.results ?? []).map((result: any) => result?.url).filter((url: string | undefined) => typeof url === "string");
|
|
if (!urls.length) return;
|
|
|
|
try {
|
|
const contentsResponse = await exa.getContents(urls, {
|
|
text: { maxCharacters: 1200 },
|
|
highlights: {
|
|
query,
|
|
maxCharacters: 320,
|
|
numSentences: 2,
|
|
highlightsPerUrl: 2,
|
|
},
|
|
} as any);
|
|
const byUrl = new Map<string, any>();
|
|
for (const contentItem of contentsResponse?.results ?? []) {
|
|
byUrl.set(normalizeUrlForMatch(contentItem?.url), contentItem);
|
|
}
|
|
|
|
enrichedResults = (value?.results ?? []).map((result: any) => {
|
|
const contentItem = byUrl.get(normalizeUrlForMatch(result?.url));
|
|
if (!contentItem) return result;
|
|
return {
|
|
...result,
|
|
text: contentItem.text ?? result.text ?? null,
|
|
highlights: Array.isArray(contentItem.highlights) ? contentItem.highlights : result.highlights ?? null,
|
|
highlightScores: Array.isArray(contentItem.highlightScores) ? contentItem.highlightScores : result.highlightScores ?? null,
|
|
};
|
|
});
|
|
|
|
stream.emit("search_results", {
|
|
requestId: value?.requestId ?? null,
|
|
results: enrichedResults.map((result: any, index: number) => mapSearchResultPreview(result, index)),
|
|
});
|
|
} catch {
|
|
// keep preview results if content enrichment fails
|
|
}
|
|
},
|
|
(reason) => {
|
|
searchError = reason?.message ?? String(reason);
|
|
stream.emit("search_error", { error: searchError });
|
|
}
|
|
);
|
|
|
|
const answerSettled = answerPromise.then(
|
|
(value) => {
|
|
answerResponse = value;
|
|
stream.emit("answer", {
|
|
answerText: parseAnswerText(value),
|
|
answerRequestId: value?.requestId ?? null,
|
|
answerCitations: (value?.citations as any) ?? null,
|
|
});
|
|
},
|
|
(reason) => {
|
|
answerError = reason?.message ?? String(reason);
|
|
stream.emit("answer_error", { error: answerError });
|
|
}
|
|
);
|
|
|
|
await Promise.all([searchSettled, answerSettled]);
|
|
|
|
const latencyMs = Math.round(performance.now() - startedAt);
|
|
const persistedResults = enrichedResults ?? searchResponse?.results ?? [];
|
|
const rows = persistedResults.map((result: any, index: number) => mapSearchResultRow(searchId, result, index));
|
|
const answerText = parseAnswerText(answerResponse);
|
|
|
|
await prisma.$transaction(async (tx) => {
|
|
await tx.search.update({
|
|
where: { id: searchId },
|
|
data: {
|
|
query,
|
|
title: normalizedTitle,
|
|
requestId: searchResponse?.requestId ?? null,
|
|
rawResponse: searchResponse as any,
|
|
latencyMs,
|
|
error: searchError,
|
|
answerText,
|
|
answerRequestId: answerResponse?.requestId ?? null,
|
|
answerCitations: (answerResponse?.citations as any) ?? null,
|
|
answerRawResponse: answerResponse as any,
|
|
answerError,
|
|
},
|
|
});
|
|
await tx.searchResult.deleteMany({ where: { searchId } });
|
|
if (rows.length) {
|
|
await tx.searchResult.createMany({ data: rows as any });
|
|
}
|
|
});
|
|
|
|
const search = await prisma.search.findUnique({
|
|
where: { id: searchId },
|
|
include: { results: { orderBy: { rank: "asc" } } },
|
|
});
|
|
if (!search) {
|
|
stream.complete({ event: "error", data: { message: "search not found" } });
|
|
} else {
|
|
stream.complete({ event: "done", data: { search } });
|
|
}
|
|
} catch (err) {
|
|
const message = getErrorMessage(err);
|
|
try {
|
|
await prisma.search.update({
|
|
where: { id: searchId },
|
|
data: {
|
|
query,
|
|
title: normalizedTitle,
|
|
latencyMs: Math.round(performance.now() - startedAt),
|
|
error: message,
|
|
},
|
|
});
|
|
} catch {
|
|
// keep the stream terminal event even if the backing search row disappeared
|
|
}
|
|
stream.complete({ event: "error", data: { message } });
|
|
} finally {
|
|
activeSearchStreams.delete(searchId);
|
|
}
|
|
}
|
|
|
|
export async function registerRoutes(app: FastifyInstance) {
|
|
app.get("/health", { logLevel: "silent" }, async () => ({ ok: true }));
|
|
|
|
app.get("/v1/auth/session", async (req) => {
|
|
requireAdmin(req);
|
|
return { authenticated: true, mode: env.ADMIN_TOKEN ? "token" : "open" };
|
|
});
|
|
|
|
app.get("/v1/models", async (req) => {
|
|
requireAdmin(req);
|
|
return { providers: getModelCatalogSnapshot() };
|
|
});
|
|
|
|
app.get("/v1/active-runs", async (req) => {
|
|
requireAdmin(req);
|
|
return {
|
|
chats: Array.from(activeChatStreams.keys()),
|
|
searches: Array.from(activeSearchStreams.keys()),
|
|
};
|
|
});
|
|
|
|
app.get("/v1/chats", async (req) => {
|
|
requireAdmin(req);
|
|
const chats = await prisma.chat.findMany({
|
|
orderBy: { updatedAt: "desc" },
|
|
take: 100,
|
|
select: {
|
|
id: true,
|
|
title: true,
|
|
createdAt: true,
|
|
updatedAt: true,
|
|
initiatedProvider: true,
|
|
initiatedModel: true,
|
|
lastUsedProvider: true,
|
|
lastUsedModel: true,
|
|
},
|
|
});
|
|
return { chats: chats.map((chat) => serializeProviderFields(chat)) };
|
|
});
|
|
|
|
app.post("/v1/chats", async (req) => {
|
|
requireAdmin(req);
|
|
const Body = z
|
|
.object({
|
|
title: z.string().optional(),
|
|
provider: ProviderSchema.optional(),
|
|
model: z.string().trim().min(1).optional(),
|
|
messages: z.array(CompletionMessageSchema).optional(),
|
|
})
|
|
.superRefine((value, ctx) => {
|
|
if (value.provider && !value.model) {
|
|
ctx.addIssue({
|
|
code: z.ZodIssueCode.custom,
|
|
message: "model is required when provider is supplied",
|
|
path: ["model"],
|
|
});
|
|
}
|
|
if (!value.provider && value.model) {
|
|
ctx.addIssue({
|
|
code: z.ZodIssueCode.custom,
|
|
message: "provider is required when model is supplied",
|
|
path: ["provider"],
|
|
});
|
|
}
|
|
});
|
|
const parsed = Body.safeParse(req.body ?? {});
|
|
if (!parsed.success) return app.httpErrors.badRequest(parsed.error.message);
|
|
const body = parsed.data;
|
|
const chat = await prisma.chat.create({
|
|
data: {
|
|
title: body.title,
|
|
initiatedProvider: body.provider ? (toPrismaProvider(body.provider) as any) : undefined,
|
|
initiatedModel: body.model,
|
|
lastUsedProvider: body.provider ? (toPrismaProvider(body.provider) as any) : undefined,
|
|
lastUsedModel: body.model,
|
|
messages: body.messages?.length
|
|
? {
|
|
create: body.messages.map((message) => ({
|
|
role: message.role as any,
|
|
content: message.content,
|
|
name: message.name,
|
|
metadata: message.attachments?.length ? ({ attachments: message.attachments } as any) : undefined,
|
|
})),
|
|
}
|
|
: undefined,
|
|
},
|
|
select: {
|
|
id: true,
|
|
title: true,
|
|
createdAt: true,
|
|
updatedAt: true,
|
|
initiatedProvider: true,
|
|
initiatedModel: true,
|
|
lastUsedProvider: true,
|
|
lastUsedModel: true,
|
|
},
|
|
});
|
|
return { chat: serializeProviderFields(chat) };
|
|
});
|
|
|
|
app.patch("/v1/chats/:chatId", async (req) => {
|
|
requireAdmin(req);
|
|
const Params = z.object({ chatId: z.string() });
|
|
const Body = z.object({ title: z.string().trim().min(1) });
|
|
const { chatId } = Params.parse(req.params);
|
|
const body = Body.parse(req.body ?? {});
|
|
|
|
const updated = await prisma.chat.updateMany({
|
|
where: { id: chatId },
|
|
data: { title: body.title },
|
|
});
|
|
|
|
if (updated.count === 0) return app.httpErrors.notFound("chat not found");
|
|
|
|
const chat = await prisma.chat.findUnique({
|
|
where: { id: chatId },
|
|
select: {
|
|
id: true,
|
|
title: true,
|
|
createdAt: true,
|
|
updatedAt: true,
|
|
initiatedProvider: true,
|
|
initiatedModel: true,
|
|
lastUsedProvider: true,
|
|
lastUsedModel: true,
|
|
},
|
|
});
|
|
if (!chat) return app.httpErrors.notFound("chat not found");
|
|
return { chat: serializeProviderFields(chat) };
|
|
});
|
|
|
|
app.post("/v1/chats/title/suggest", async (req) => {
|
|
requireAdmin(req);
|
|
const Body = z.object({
|
|
chatId: z.string(),
|
|
content: z.string().trim().min(1),
|
|
});
|
|
const body = Body.parse(req.body ?? {});
|
|
|
|
const existing = await prisma.chat.findUnique({
|
|
where: { id: body.chatId },
|
|
select: {
|
|
id: true,
|
|
title: true,
|
|
createdAt: true,
|
|
updatedAt: true,
|
|
initiatedProvider: true,
|
|
initiatedModel: true,
|
|
lastUsedProvider: true,
|
|
lastUsedModel: true,
|
|
},
|
|
});
|
|
if (!existing) return app.httpErrors.notFound("chat not found");
|
|
if (existing.title?.trim()) return { chat: serializeProviderFields(existing) };
|
|
|
|
const fallback = body.content.split(/\r?\n/)[0]?.trim().slice(0, 48) || "New chat";
|
|
const suggestedRaw = await generateChatTitle(body.content);
|
|
const title = normalizeSuggestedTitle(suggestedRaw, fallback);
|
|
|
|
const chat = await prisma.chat.update({
|
|
where: { id: body.chatId },
|
|
data: { title },
|
|
select: {
|
|
id: true,
|
|
title: true,
|
|
createdAt: true,
|
|
updatedAt: true,
|
|
initiatedProvider: true,
|
|
initiatedModel: true,
|
|
lastUsedProvider: true,
|
|
lastUsedModel: true,
|
|
},
|
|
});
|
|
|
|
return { chat: serializeProviderFields(chat) };
|
|
});
|
|
|
|
app.delete("/v1/chats/:chatId", async (req) => {
|
|
requireAdmin(req);
|
|
const Params = z.object({ chatId: z.string() });
|
|
const { chatId } = Params.parse(req.params);
|
|
|
|
req.log.info({ chatId }, "delete chat requested");
|
|
|
|
const result = await prisma.chat.deleteMany({ where: { id: chatId } });
|
|
if (result.count === 0) {
|
|
req.log.warn({ chatId }, "delete chat target not found");
|
|
return app.httpErrors.notFound("chat not found");
|
|
}
|
|
|
|
req.log.info({ chatId }, "chat deleted");
|
|
return { deleted: true };
|
|
});
|
|
|
|
app.get("/v1/searches", async (req) => {
|
|
requireAdmin(req);
|
|
const searches = await prisma.search.findMany({
|
|
orderBy: { updatedAt: "desc" },
|
|
take: 100,
|
|
select: { id: true, title: true, query: true, createdAt: true, updatedAt: true },
|
|
});
|
|
return { searches };
|
|
});
|
|
|
|
app.post("/v1/searches", async (req) => {
|
|
requireAdmin(req);
|
|
const Body = z.object({ title: z.string().optional(), query: z.string().optional() });
|
|
const body = Body.parse(req.body ?? {});
|
|
const title = body.title?.trim() || body.query?.trim()?.slice(0, 80);
|
|
const query = body.query?.trim() || null;
|
|
const search = await prisma.search.create({
|
|
data: {
|
|
title: title || null,
|
|
query,
|
|
},
|
|
select: { id: true, title: true, query: true, createdAt: true, updatedAt: true },
|
|
});
|
|
return { search };
|
|
});
|
|
|
|
app.delete("/v1/searches/:searchId", async (req) => {
|
|
requireAdmin(req);
|
|
const Params = z.object({ searchId: z.string() });
|
|
const { searchId } = Params.parse(req.params);
|
|
|
|
req.log.info({ searchId }, "delete search requested");
|
|
|
|
const result = await prisma.search.deleteMany({ where: { id: searchId } });
|
|
if (result.count === 0) {
|
|
req.log.warn({ searchId }, "delete search target not found");
|
|
return app.httpErrors.notFound("search not found");
|
|
}
|
|
req.log.info({ searchId }, "search deleted");
|
|
return { deleted: true };
|
|
});
|
|
|
|
app.get("/v1/searches/:searchId", async (req) => {
|
|
requireAdmin(req);
|
|
const Params = z.object({ searchId: z.string() });
|
|
const { searchId } = Params.parse(req.params);
|
|
|
|
const search = await prisma.search.findUnique({
|
|
where: { id: searchId },
|
|
include: { results: { orderBy: { rank: "asc" } } },
|
|
});
|
|
if (!search) return app.httpErrors.notFound("search not found");
|
|
return { search };
|
|
});
|
|
|
|
app.post("/v1/searches/:searchId/chat", async (req) => {
|
|
requireAdmin(req);
|
|
const Params = z.object({ searchId: z.string() });
|
|
const Body = z.object({ title: z.string().optional() });
|
|
const { searchId } = Params.parse(req.params);
|
|
const body = Body.parse(req.body ?? {});
|
|
|
|
const search = await prisma.search.findUnique({
|
|
where: { id: searchId },
|
|
include: { results: { orderBy: { rank: "asc" } } },
|
|
});
|
|
if (!search) return app.httpErrors.notFound("search not found");
|
|
|
|
const fallbackTitle = search.query?.trim() || search.title?.trim() || "Search results";
|
|
const title = body.title?.trim() || `Search: ${fallbackTitle.slice(0, 72)}`;
|
|
const context = buildSearchChatContext(search);
|
|
|
|
const chat = await prisma.chat.create({
|
|
data: {
|
|
title,
|
|
messages: {
|
|
create: {
|
|
role: "system" as any,
|
|
content: context,
|
|
metadata: {
|
|
kind: "search_context",
|
|
searchId: search.id,
|
|
query: search.query,
|
|
resultCount: search.results.length,
|
|
},
|
|
},
|
|
},
|
|
},
|
|
select: {
|
|
id: true,
|
|
title: true,
|
|
createdAt: true,
|
|
updatedAt: true,
|
|
initiatedProvider: true,
|
|
initiatedModel: true,
|
|
lastUsedProvider: true,
|
|
lastUsedModel: true,
|
|
},
|
|
});
|
|
|
|
return { chat: serializeProviderFields(chat) };
|
|
});
|
|
|
|
app.post("/v1/searches/:searchId/run", async (req) => {
|
|
requireAdmin(req);
|
|
const Params = z.object({ searchId: z.string() });
|
|
|
|
const { searchId } = Params.parse(req.params);
|
|
const body = SearchRunBody.parse(req.body ?? {});
|
|
|
|
const existing = await prisma.search.findUnique({
|
|
where: { id: searchId },
|
|
select: { id: true, query: true },
|
|
});
|
|
if (!existing) return app.httpErrors.notFound("search not found");
|
|
|
|
const query = body.query?.trim() || existing.query?.trim();
|
|
if (!query) return app.httpErrors.badRequest("query is required");
|
|
|
|
const startedAt = performance.now();
|
|
|
|
try {
|
|
const exa = exaClient();
|
|
const [searchOutcome, answerOutcome] = await Promise.allSettled([
|
|
exa.searchAndContents(query, {
|
|
type: body.type ?? "auto",
|
|
numResults: body.numResults ?? 10,
|
|
includeDomains: body.includeDomains,
|
|
excludeDomains: body.excludeDomains,
|
|
text: { maxCharacters: 1200 },
|
|
highlights: {
|
|
query,
|
|
maxCharacters: 320,
|
|
numSentences: 2,
|
|
highlightsPerUrl: 2,
|
|
},
|
|
moderation: true,
|
|
userLocation: "US",
|
|
} as any),
|
|
exa.answer(query, {
|
|
text: true,
|
|
model: "exa",
|
|
userLocation: "US",
|
|
}),
|
|
]);
|
|
|
|
const searchResponse = searchOutcome.status === "fulfilled" ? searchOutcome.value : null;
|
|
const answerResponse = answerOutcome.status === "fulfilled" ? answerOutcome.value : null;
|
|
const searchError = searchOutcome.status === "rejected" ? searchOutcome.reason?.message ?? String(searchOutcome.reason) : null;
|
|
const answerError = answerOutcome.status === "rejected" ? answerOutcome.reason?.message ?? String(answerOutcome.reason) : null;
|
|
|
|
const latencyMs = Math.round(performance.now() - startedAt);
|
|
const normalizedTitle = body.title?.trim() || query.slice(0, 80);
|
|
const rows = (searchResponse?.results ?? []).map((result: any, index: number) => mapSearchResultRow(searchId, result, index));
|
|
const answerText = parseAnswerText(answerResponse);
|
|
|
|
await prisma.$transaction(async (tx) => {
|
|
await tx.search.update({
|
|
where: { id: searchId },
|
|
data: {
|
|
query,
|
|
title: normalizedTitle,
|
|
requestId: searchResponse?.requestId ?? null,
|
|
rawResponse: searchResponse as any,
|
|
latencyMs,
|
|
error: searchError,
|
|
answerText,
|
|
answerRequestId: answerResponse?.requestId ?? null,
|
|
answerCitations: (answerResponse?.citations as any) ?? null,
|
|
answerRawResponse: answerResponse as any,
|
|
answerError,
|
|
},
|
|
});
|
|
await tx.searchResult.deleteMany({ where: { searchId } });
|
|
if (rows.length) {
|
|
await tx.searchResult.createMany({ data: rows as any });
|
|
}
|
|
});
|
|
|
|
if (searchError && answerError) {
|
|
throw app.httpErrors.badGateway(`Exa search and answer failed: ${searchError}; ${answerError}`);
|
|
}
|
|
|
|
const search = await prisma.search.findUnique({
|
|
where: { id: searchId },
|
|
include: { results: { orderBy: { rank: "asc" } } },
|
|
});
|
|
if (!search) return app.httpErrors.notFound("search not found");
|
|
return { search };
|
|
} catch (err: any) {
|
|
await prisma.search.update({
|
|
where: { id: searchId },
|
|
data: {
|
|
latencyMs: Math.round(performance.now() - startedAt),
|
|
error: err?.message ?? String(err),
|
|
},
|
|
});
|
|
throw err;
|
|
}
|
|
});
|
|
|
|
app.post("/v1/searches/:searchId/run/stream", async (req, reply) => {
|
|
requireAdmin(req);
|
|
const Params = z.object({ searchId: z.string() });
|
|
const { searchId } = Params.parse(req.params);
|
|
const body = SearchRunBody.parse(req.body ?? {});
|
|
|
|
const existing = await prisma.search.findUnique({
|
|
where: { id: searchId },
|
|
select: { id: true, query: true },
|
|
});
|
|
if (!existing) return app.httpErrors.notFound("search not found");
|
|
|
|
const query = body.query?.trim() || existing.query?.trim();
|
|
if (!query) return app.httpErrors.badRequest("query is required");
|
|
|
|
const existingStream = activeSearchStreams.get(searchId);
|
|
if (existingStream) {
|
|
return streamActiveRun(req, reply, existingStream);
|
|
}
|
|
|
|
const stream = new ActiveSseStream();
|
|
activeSearchStreams.set(searchId, stream);
|
|
void executeSearchRunStream(searchId, { ...body, query }, stream);
|
|
return streamActiveRun(req, reply, stream);
|
|
});
|
|
|
|
app.post("/v1/searches/:searchId/run/stream/attach", async (req, reply) => {
|
|
requireAdmin(req);
|
|
const Params = z.object({ searchId: z.string() });
|
|
const { searchId } = Params.parse(req.params);
|
|
const stream = activeSearchStreams.get(searchId);
|
|
if (!stream) return app.httpErrors.notFound("active search stream not found");
|
|
return streamActiveRun(req, reply, stream);
|
|
});
|
|
|
|
app.get("/v1/chats/:chatId", async (req) => {
|
|
requireAdmin(req);
|
|
const Params = z.object({ chatId: z.string() });
|
|
const { chatId } = Params.parse(req.params);
|
|
|
|
const chat = await prisma.chat.findUnique({
|
|
where: { id: chatId },
|
|
include: { messages: { orderBy: { createdAt: "asc" } }, calls: { orderBy: { createdAt: "desc" } } },
|
|
});
|
|
if (!chat) return app.httpErrors.notFound("chat not found");
|
|
return { chat: serializeProviderFields(chat) };
|
|
});
|
|
|
|
app.post("/v1/chats/:chatId/messages", async (req) => {
|
|
requireAdmin(req);
|
|
const Params = z.object({ chatId: z.string() });
|
|
const Body = z.object({
|
|
role: z.enum(["system", "user", "assistant", "tool"]),
|
|
content: z.string(),
|
|
name: z.string().optional(),
|
|
metadata: z.unknown().optional(),
|
|
attachments: z.array(ChatAttachmentSchema).max(MAX_CHAT_ATTACHMENTS).optional(),
|
|
});
|
|
|
|
const { chatId } = Params.parse(req.params);
|
|
const parsed = Body.safeParse(req.body);
|
|
if (!parsed.success) return app.httpErrors.badRequest(parsed.error.message);
|
|
const body = parsed.data;
|
|
|
|
const msg = await prisma.message.create({
|
|
data: {
|
|
chatId,
|
|
role: body.role as any,
|
|
content: body.content,
|
|
name: body.name,
|
|
metadata: mergeAttachmentsIntoMetadata(body.metadata, body.attachments) as any,
|
|
},
|
|
});
|
|
|
|
return { message: msg };
|
|
});
|
|
|
|
app.post("/v1/chats/:chatId/stream/attach", async (req, reply) => {
|
|
requireAdmin(req);
|
|
const Params = z.object({ chatId: z.string() });
|
|
const { chatId } = Params.parse(req.params);
|
|
const stream = activeChatStreams.get(chatId);
|
|
if (!stream) return app.httpErrors.notFound("active chat stream not found");
|
|
return streamActiveRun(req, reply, stream);
|
|
});
|
|
|
|
// Main: create a completion via provider+model and store everything.
|
|
app.post("/v1/chat-completions", async (req) => {
|
|
requireAdmin(req);
|
|
|
|
const Body = z.object({
|
|
chatId: z.string().optional(),
|
|
provider: ProviderSchema,
|
|
model: z.string().min(1),
|
|
messages: z.array(CompletionMessageSchema),
|
|
temperature: z.number().min(0).max(2).optional(),
|
|
maxTokens: z.number().int().positive().optional(),
|
|
});
|
|
|
|
const parsed = Body.safeParse(req.body);
|
|
if (!parsed.success) return app.httpErrors.badRequest(parsed.error.message);
|
|
const body = parsed.data;
|
|
|
|
// ensure chat exists if provided
|
|
if (body.chatId) {
|
|
const exists = await prisma.chat.findUnique({ where: { id: body.chatId }, select: { id: true } });
|
|
if (!exists) return app.httpErrors.notFound("chat not found");
|
|
}
|
|
|
|
// Store only new non-assistant messages to avoid duplicate history entries.
|
|
if (body.chatId) {
|
|
await storeNonAssistantMessages(body.chatId, body.messages);
|
|
}
|
|
|
|
const result = await runMultiplex(body);
|
|
|
|
return {
|
|
chatId: body.chatId ?? null,
|
|
...result,
|
|
};
|
|
});
|
|
|
|
// Streaming SSE endpoint.
|
|
app.post("/v1/chat-completions/stream", async (req, reply) => {
|
|
requireAdmin(req);
|
|
|
|
const parsed = CompletionStreamBody.safeParse(req.body);
|
|
if (!parsed.success) return app.httpErrors.badRequest(parsed.error.message);
|
|
const body = parsed.data;
|
|
|
|
// ensure chat exists if provided
|
|
if (body.chatId) {
|
|
const exists = await prisma.chat.findUnique({ where: { id: body.chatId }, select: { id: true } });
|
|
if (!exists) return app.httpErrors.notFound("chat not found");
|
|
}
|
|
|
|
// Store only new non-assistant messages to avoid duplicate history entries.
|
|
if (body.persist !== false && body.chatId) {
|
|
await storeNonAssistantMessages(body.chatId, body.messages);
|
|
}
|
|
|
|
if (body.persist !== false && body.chatId) {
|
|
if (activeChatStreams.has(body.chatId)) {
|
|
return app.httpErrors.conflict("chat completion already running");
|
|
}
|
|
const stream = startActiveChatStream(body.chatId, body);
|
|
return streamActiveRun(req, reply, stream);
|
|
}
|
|
|
|
reply.raw.writeHead(200, buildSseHeaders(typeof req.headers.origin === "string" ? req.headers.origin : undefined));
|
|
reply.raw.flushHeaders();
|
|
|
|
for await (const ev of runMultiplexStream(body)) {
|
|
writeSseEvent(reply, mapChatStreamEvent(ev));
|
|
}
|
|
|
|
if (!reply.raw.destroyed && !reply.raw.writableEnded) {
|
|
reply.raw.end();
|
|
}
|
|
return reply;
|
|
});
|
|
}
|