quick question feature
This commit is contained in:
@@ -10,11 +10,17 @@ import {
|
||||
import { buildAnthropicConversationMessage, getAnthropicSystemPrompt } from "./message-content.js";
|
||||
import type { MultiplexRequest, Provider } from "./types.js";
|
||||
|
||||
type StreamUsage = {
|
||||
inputTokens?: number;
|
||||
outputTokens?: number;
|
||||
totalTokens?: number;
|
||||
};
|
||||
|
||||
export type StreamEvent =
|
||||
| { type: "meta"; chatId: string; callId: string; provider: Provider; model: string }
|
||||
| { type: "meta"; chatId: string | null; callId: string | null; provider: Provider; model: string }
|
||||
| { type: "tool_call"; event: ToolExecutionEvent }
|
||||
| { type: "delta"; text: string }
|
||||
| { type: "done"; text: string; usage?: { inputTokens?: number; outputTokens?: number; totalTokens?: number } }
|
||||
| { type: "done"; text: string; usage?: StreamUsage }
|
||||
| { type: "error"; message: string };
|
||||
|
||||
function getChatIdOrCreate(chatId?: string) {
|
||||
@@ -24,39 +30,45 @@ function getChatIdOrCreate(chatId?: string) {
|
||||
|
||||
export async function* runMultiplexStream(req: MultiplexRequest): AsyncGenerator<StreamEvent> {
|
||||
const t0 = performance.now();
|
||||
const chatId = await getChatIdOrCreate(req.chatId);
|
||||
const shouldPersist = req.persist !== false;
|
||||
const chatId = shouldPersist ? await getChatIdOrCreate(req.chatId) : null;
|
||||
|
||||
const call = await prisma.llmCall.create({
|
||||
data: {
|
||||
chatId,
|
||||
provider: req.provider as any,
|
||||
model: req.model,
|
||||
request: req as any,
|
||||
},
|
||||
select: { id: true },
|
||||
});
|
||||
const call =
|
||||
shouldPersist && chatId
|
||||
? await prisma.llmCall.create({
|
||||
data: {
|
||||
chatId,
|
||||
provider: req.provider as any,
|
||||
model: req.model,
|
||||
request: req as any,
|
||||
},
|
||||
select: { id: true },
|
||||
})
|
||||
: null;
|
||||
|
||||
await prisma.$transaction([
|
||||
prisma.chat.update({
|
||||
where: { id: chatId },
|
||||
data: {
|
||||
lastUsedProvider: req.provider as any,
|
||||
lastUsedModel: req.model,
|
||||
},
|
||||
}),
|
||||
prisma.chat.updateMany({
|
||||
where: { id: chatId, initiatedProvider: null },
|
||||
data: {
|
||||
initiatedProvider: req.provider as any,
|
||||
initiatedModel: req.model,
|
||||
},
|
||||
}),
|
||||
]);
|
||||
if (shouldPersist && chatId) {
|
||||
await prisma.$transaction([
|
||||
prisma.chat.update({
|
||||
where: { id: chatId },
|
||||
data: {
|
||||
lastUsedProvider: req.provider as any,
|
||||
lastUsedModel: req.model,
|
||||
},
|
||||
}),
|
||||
prisma.chat.updateMany({
|
||||
where: { id: chatId, initiatedProvider: null },
|
||||
data: {
|
||||
initiatedProvider: req.provider as any,
|
||||
initiatedModel: req.model,
|
||||
},
|
||||
}),
|
||||
]);
|
||||
}
|
||||
|
||||
yield { type: "meta", chatId, callId: call.id, provider: req.provider, model: req.model };
|
||||
yield { type: "meta", chatId, callId: call?.id ?? null, provider: req.provider, model: req.model };
|
||||
|
||||
let text = "";
|
||||
let usage: StreamEvent extends any ? any : never;
|
||||
let usage: StreamUsage | undefined;
|
||||
let raw: unknown = { streamed: true };
|
||||
|
||||
try {
|
||||
@@ -73,7 +85,7 @@ export async function* runMultiplexStream(req: MultiplexRequest): AsyncGenerator
|
||||
logContext: {
|
||||
provider: req.provider,
|
||||
model: req.model,
|
||||
chatId,
|
||||
chatId: chatId ?? undefined,
|
||||
},
|
||||
})
|
||||
: runToolAwareChatCompletionsStream({
|
||||
@@ -85,7 +97,7 @@ export async function* runMultiplexStream(req: MultiplexRequest): AsyncGenerator
|
||||
logContext: {
|
||||
provider: req.provider,
|
||||
model: req.model,
|
||||
chatId,
|
||||
chatId: chatId ?? undefined,
|
||||
},
|
||||
});
|
||||
for await (const ev of streamEvents) {
|
||||
@@ -96,16 +108,18 @@ export async function* runMultiplexStream(req: MultiplexRequest): AsyncGenerator
|
||||
}
|
||||
|
||||
if (ev.type === "tool_call") {
|
||||
const toolMessage = buildToolLogMessageData(chatId, ev.event);
|
||||
await prisma.message.create({
|
||||
data: {
|
||||
chatId: toolMessage.chatId,
|
||||
role: toolMessage.role as any,
|
||||
content: toolMessage.content,
|
||||
name: toolMessage.name,
|
||||
metadata: toolMessage.metadata as any,
|
||||
},
|
||||
});
|
||||
if (shouldPersist && chatId) {
|
||||
const toolMessage = buildToolLogMessageData(chatId, ev.event);
|
||||
await prisma.message.create({
|
||||
data: {
|
||||
chatId: toolMessage.chatId,
|
||||
role: toolMessage.role as any,
|
||||
content: toolMessage.content,
|
||||
name: toolMessage.name,
|
||||
metadata: toolMessage.metadata as any,
|
||||
},
|
||||
});
|
||||
}
|
||||
yield { type: "tool_call", event: ev.event };
|
||||
continue;
|
||||
}
|
||||
@@ -156,32 +170,36 @@ export async function* runMultiplexStream(req: MultiplexRequest): AsyncGenerator
|
||||
|
||||
const latencyMs = Math.round(performance.now() - t0);
|
||||
|
||||
await prisma.$transaction(async (tx) => {
|
||||
await tx.message.create({
|
||||
data: { chatId, role: "assistant" as any, content: text },
|
||||
if (shouldPersist && chatId && call) {
|
||||
await prisma.$transaction(async (tx) => {
|
||||
await tx.message.create({
|
||||
data: { chatId, role: "assistant" as any, content: text },
|
||||
});
|
||||
await tx.llmCall.update({
|
||||
where: { id: call.id },
|
||||
data: {
|
||||
response: raw as any,
|
||||
latencyMs,
|
||||
inputTokens: usage?.inputTokens,
|
||||
outputTokens: usage?.outputTokens,
|
||||
totalTokens: usage?.totalTokens,
|
||||
},
|
||||
});
|
||||
});
|
||||
await tx.llmCall.update({
|
||||
where: { id: call.id },
|
||||
data: {
|
||||
response: raw as any,
|
||||
latencyMs,
|
||||
inputTokens: usage?.inputTokens,
|
||||
outputTokens: usage?.outputTokens,
|
||||
totalTokens: usage?.totalTokens,
|
||||
},
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
yield { type: "done", text, usage };
|
||||
} catch (e: any) {
|
||||
const latencyMs = Math.round(performance.now() - t0);
|
||||
await prisma.llmCall.update({
|
||||
where: { id: call.id },
|
||||
data: {
|
||||
error: e?.message ?? String(e),
|
||||
latencyMs,
|
||||
},
|
||||
});
|
||||
if (shouldPersist && call) {
|
||||
await prisma.llmCall.update({
|
||||
where: { id: call.id },
|
||||
data: {
|
||||
error: e?.message ?? String(e),
|
||||
latencyMs,
|
||||
},
|
||||
});
|
||||
}
|
||||
yield { type: "error", message: e?.message ?? String(e) };
|
||||
}
|
||||
}
|
||||
|
||||
@@ -30,6 +30,7 @@ export type ChatMessage = {
|
||||
|
||||
export type MultiplexRequest = {
|
||||
chatId?: string;
|
||||
persist?: boolean;
|
||||
provider: Provider;
|
||||
model: string;
|
||||
messages: ChatMessage[];
|
||||
|
||||
@@ -327,10 +327,50 @@ export async function registerRoutes(app: FastifyInstance) {
|
||||
|
||||
app.post("/v1/chats", async (req) => {
|
||||
requireAdmin(req);
|
||||
const Body = z.object({ title: z.string().optional() });
|
||||
const body = Body.parse(req.body ?? {});
|
||||
const Body = z
|
||||
.object({
|
||||
title: z.string().optional(),
|
||||
provider: z.enum(["openai", "anthropic", "xai"]).optional(),
|
||||
model: z.string().trim().min(1).optional(),
|
||||
messages: z.array(CompletionMessageSchema).optional(),
|
||||
})
|
||||
.superRefine((value, ctx) => {
|
||||
if (value.provider && !value.model) {
|
||||
ctx.addIssue({
|
||||
code: z.ZodIssueCode.custom,
|
||||
message: "model is required when provider is supplied",
|
||||
path: ["model"],
|
||||
});
|
||||
}
|
||||
if (!value.provider && value.model) {
|
||||
ctx.addIssue({
|
||||
code: z.ZodIssueCode.custom,
|
||||
message: "provider is required when model is supplied",
|
||||
path: ["provider"],
|
||||
});
|
||||
}
|
||||
});
|
||||
const parsed = Body.safeParse(req.body ?? {});
|
||||
if (!parsed.success) return app.httpErrors.badRequest(parsed.error.message);
|
||||
const body = parsed.data;
|
||||
const chat = await prisma.chat.create({
|
||||
data: { title: body.title },
|
||||
data: {
|
||||
title: body.title,
|
||||
initiatedProvider: body.provider as any,
|
||||
initiatedModel: body.model,
|
||||
lastUsedProvider: body.provider as any,
|
||||
lastUsedModel: body.model,
|
||||
messages: body.messages?.length
|
||||
? {
|
||||
create: body.messages.map((message) => ({
|
||||
role: message.role as any,
|
||||
content: message.content,
|
||||
name: message.name,
|
||||
metadata: message.attachments?.length ? ({ attachments: message.attachments } as any) : undefined,
|
||||
})),
|
||||
}
|
||||
: undefined,
|
||||
},
|
||||
select: {
|
||||
id: true,
|
||||
title: true,
|
||||
@@ -838,7 +878,9 @@ export async function registerRoutes(app: FastifyInstance) {
|
||||
});
|
||||
|
||||
const { chatId } = Params.parse(req.params);
|
||||
const body = Body.parse(req.body);
|
||||
const parsed = Body.safeParse(req.body);
|
||||
if (!parsed.success) return app.httpErrors.badRequest(parsed.error.message);
|
||||
const body = parsed.data;
|
||||
|
||||
const msg = await prisma.message.create({
|
||||
data: {
|
||||
@@ -866,7 +908,9 @@ export async function registerRoutes(app: FastifyInstance) {
|
||||
maxTokens: z.number().int().positive().optional(),
|
||||
});
|
||||
|
||||
const body = Body.parse(req.body);
|
||||
const parsed = Body.safeParse(req.body);
|
||||
if (!parsed.success) return app.httpErrors.badRequest(parsed.error.message);
|
||||
const body = parsed.data;
|
||||
|
||||
// ensure chat exists if provided
|
||||
if (body.chatId) {
|
||||
@@ -891,16 +935,29 @@ export async function registerRoutes(app: FastifyInstance) {
|
||||
app.post("/v1/chat-completions/stream", async (req, reply) => {
|
||||
requireAdmin(req);
|
||||
|
||||
const Body = z.object({
|
||||
chatId: z.string().optional(),
|
||||
provider: z.enum(["openai", "anthropic", "xai"]),
|
||||
model: z.string().min(1),
|
||||
messages: z.array(CompletionMessageSchema),
|
||||
temperature: z.number().min(0).max(2).optional(),
|
||||
maxTokens: z.number().int().positive().optional(),
|
||||
});
|
||||
const Body = z
|
||||
.object({
|
||||
chatId: z.string().optional(),
|
||||
persist: z.boolean().optional(),
|
||||
provider: z.enum(["openai", "anthropic", "xai"]),
|
||||
model: z.string().min(1),
|
||||
messages: z.array(CompletionMessageSchema),
|
||||
temperature: z.number().min(0).max(2).optional(),
|
||||
maxTokens: z.number().int().positive().optional(),
|
||||
})
|
||||
.superRefine((value, ctx) => {
|
||||
if (value.persist === false && value.chatId) {
|
||||
ctx.addIssue({
|
||||
code: z.ZodIssueCode.custom,
|
||||
message: "chatId must be omitted when persist is false",
|
||||
path: ["chatId"],
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
const body = Body.parse(req.body);
|
||||
const parsed = Body.safeParse(req.body);
|
||||
if (!parsed.success) return app.httpErrors.badRequest(parsed.error.message);
|
||||
const body = parsed.data;
|
||||
|
||||
// ensure chat exists if provided
|
||||
if (body.chatId) {
|
||||
@@ -909,7 +966,7 @@ export async function registerRoutes(app: FastifyInstance) {
|
||||
}
|
||||
|
||||
// Store only new non-assistant messages to avoid duplicate history entries.
|
||||
if (body.chatId) {
|
||||
if (body.persist !== false && body.chatId) {
|
||||
await storeNonAssistantMessages(body.chatId, body.messages);
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user