Implement streaming
This commit is contained in:
@@ -16,7 +16,7 @@ import {
|
||||
getSearch,
|
||||
listChats,
|
||||
listSearches,
|
||||
runCompletion,
|
||||
runCompletionStream,
|
||||
runSearchStream,
|
||||
type ModelCatalogResponse,
|
||||
type Provider,
|
||||
@@ -268,6 +268,7 @@ export default function App() {
|
||||
const [error, setError] = useState<string | null>(null);
|
||||
const transcriptEndRef = useRef<HTMLDivElement>(null);
|
||||
const contextMenuRef = useRef<HTMLDivElement>(null);
|
||||
const selectedItemRef = useRef<SidebarSelection | null>(null);
|
||||
const searchRunAbortRef = useRef<AbortController | null>(null);
|
||||
const searchRunCounterRef = useRef(0);
|
||||
const [contextMenu, setContextMenu] = useState<ContextMenuState | null>(null);
|
||||
@@ -404,6 +405,10 @@ export default function App() {
|
||||
|
||||
const selectedKey = selectedItem ? `${selectedItem.kind}:${selectedItem.id}` : null;
|
||||
|
||||
useEffect(() => {
|
||||
selectedItemRef.current = selectedItem;
|
||||
}, [selectedItem]);
|
||||
|
||||
useEffect(() => {
|
||||
if (!isAuthenticated) {
|
||||
setSelectedChat(null);
|
||||
@@ -438,6 +443,13 @@ export default function App() {
|
||||
const messages = selectedChat?.messages ?? [];
|
||||
const isSearchMode = draftKind ? draftKind === "search" : selectedItem?.kind === "search";
|
||||
const isSearchRunning = isSending && isSearchMode;
|
||||
const isSendingActiveChat =
|
||||
isSending &&
|
||||
!isSearchMode &&
|
||||
!!pendingChatState &&
|
||||
!!pendingChatState.chatId &&
|
||||
selectedItem?.kind === "chat" &&
|
||||
selectedItem.id === pendingChatState.chatId;
|
||||
const displayMessages = useMemo(() => {
|
||||
if (!pendingChatState) return messages;
|
||||
if (pendingChatState.chatId) {
|
||||
@@ -606,14 +618,62 @@ export default function App() {
|
||||
throw new Error("No model available for selected provider");
|
||||
}
|
||||
|
||||
await runCompletion({
|
||||
chatId,
|
||||
provider,
|
||||
model: selectedModel,
|
||||
messages: requestMessages,
|
||||
});
|
||||
let streamErrorMessage: string | null = null;
|
||||
|
||||
await Promise.all([refreshCollections({ kind: "chat", id: chatId }), refreshChat(chatId)]);
|
||||
await runCompletionStream(
|
||||
{
|
||||
chatId,
|
||||
provider,
|
||||
model: selectedModel,
|
||||
messages: requestMessages,
|
||||
},
|
||||
{
|
||||
onMeta: (payload) => {
|
||||
if (payload.chatId !== chatId) return;
|
||||
setPendingChatState((current) => (current ? { ...current, chatId: payload.chatId } : current));
|
||||
},
|
||||
onDelta: (payload) => {
|
||||
if (!payload.text) return;
|
||||
setPendingChatState((current) => {
|
||||
if (!current) return current;
|
||||
let updated = false;
|
||||
const nextMessages = current.messages.map((message, index, all) => {
|
||||
const isTarget = index === all.length - 1 && message.id.startsWith("temp-assistant-");
|
||||
if (!isTarget) return message;
|
||||
updated = true;
|
||||
return { ...message, content: message.content + payload.text };
|
||||
});
|
||||
return updated ? { ...current, messages: nextMessages } : current;
|
||||
});
|
||||
},
|
||||
onDone: (payload) => {
|
||||
setPendingChatState((current) => {
|
||||
if (!current) return current;
|
||||
let updated = false;
|
||||
const nextMessages = current.messages.map((message, index, all) => {
|
||||
const isTarget = index === all.length - 1 && message.id.startsWith("temp-assistant-");
|
||||
if (!isTarget) return message;
|
||||
updated = true;
|
||||
return { ...message, content: payload.text };
|
||||
});
|
||||
return updated ? { ...current, messages: nextMessages } : current;
|
||||
});
|
||||
},
|
||||
onError: (payload) => {
|
||||
streamErrorMessage = payload.message;
|
||||
},
|
||||
}
|
||||
);
|
||||
|
||||
if (streamErrorMessage) {
|
||||
throw new Error(streamErrorMessage);
|
||||
}
|
||||
|
||||
await refreshCollections();
|
||||
const currentSelection = selectedItemRef.current;
|
||||
if (currentSelection?.kind === "chat" && currentSelection.id === chatId) {
|
||||
await refreshChat(chatId);
|
||||
}
|
||||
setPendingChatState(null);
|
||||
};
|
||||
|
||||
@@ -914,7 +974,7 @@ export default function App() {
|
||||
|
||||
<div className="flex-1 overflow-y-auto px-3 py-6 md:px-10">
|
||||
{!isSearchMode ? (
|
||||
<ChatMessagesPanel messages={displayMessages} isLoading={isLoadingSelection} isSending={isSending} />
|
||||
<ChatMessagesPanel messages={displayMessages} isLoading={isLoadingSelection} isSending={isSendingActiveChat} />
|
||||
) : (
|
||||
<SearchResultsPanel search={selectedSearch} isLoading={isLoadingSelection} isRunning={isSearchRunning} />
|
||||
)}
|
||||
|
||||
@@ -9,7 +9,7 @@ type Props = {
|
||||
};
|
||||
|
||||
export function ChatMessagesPanel({ messages, isLoading, isSending }: Props) {
|
||||
const hasPendingAssistant = messages.some((message) => message.id.startsWith("temp-assistant-"));
|
||||
const hasPendingAssistant = messages.some((message) => message.id.startsWith("temp-assistant-") && message.content.trim().length === 0);
|
||||
|
||||
return (
|
||||
<>
|
||||
@@ -17,7 +17,7 @@ export function ChatMessagesPanel({ messages, isLoading, isSending }: Props) {
|
||||
<div className="mx-auto max-w-3xl space-y-6">
|
||||
{messages.map((message) => {
|
||||
const isUser = message.role === "user";
|
||||
const isPendingAssistant = message.id.startsWith("temp-assistant-") && isSending;
|
||||
const isPendingAssistant = message.id.startsWith("temp-assistant-") && isSending && message.content.trim().length === 0;
|
||||
return (
|
||||
<div key={message.id} className={cn("flex", isUser ? "justify-end" : "justify-start")}>
|
||||
<div
|
||||
|
||||
@@ -103,6 +103,13 @@ type CompletionResponse = {
|
||||
};
|
||||
};
|
||||
|
||||
type CompletionStreamHandlers = {
|
||||
onMeta?: (payload: { chatId: string; callId: string; provider: Provider; model: string }) => void;
|
||||
onDelta?: (payload: { text: string }) => void;
|
||||
onDone?: (payload: { text: string; usage?: { inputTokens?: number; outputTokens?: number; totalTokens?: number } }) => void;
|
||||
onError?: (payload: { message: string }) => void;
|
||||
};
|
||||
|
||||
const API_BASE_URL = import.meta.env.VITE_API_BASE_URL ?? "/api";
|
||||
const ENV_ADMIN_TOKEN = (import.meta.env.VITE_ADMIN_TOKEN as string | undefined)?.trim() || null;
|
||||
let authToken: string | null = ENV_ADMIN_TOKEN;
|
||||
@@ -321,3 +328,109 @@ export async function runCompletion(body: {
|
||||
body: JSON.stringify(body),
|
||||
});
|
||||
}
|
||||
|
||||
export async function runCompletionStream(
|
||||
body: {
|
||||
chatId: string;
|
||||
provider: Provider;
|
||||
model: string;
|
||||
messages: CompletionRequestMessage[];
|
||||
},
|
||||
handlers: CompletionStreamHandlers,
|
||||
options?: { signal?: AbortSignal }
|
||||
) {
|
||||
const headers = new Headers({
|
||||
Accept: "text/event-stream",
|
||||
"Content-Type": "application/json",
|
||||
});
|
||||
if (authToken) {
|
||||
headers.set("Authorization", `Bearer ${authToken}`);
|
||||
}
|
||||
|
||||
const response = await fetch(`${API_BASE_URL}/v1/chat-completions/stream`, {
|
||||
method: "POST",
|
||||
headers,
|
||||
body: JSON.stringify(body),
|
||||
signal: options?.signal,
|
||||
});
|
||||
|
||||
if (!response.ok) {
|
||||
const fallback = `${response.status} ${response.statusText}`;
|
||||
let message = fallback;
|
||||
try {
|
||||
const body = (await response.json()) as { message?: string };
|
||||
if (body.message) message = body.message;
|
||||
} catch {
|
||||
// keep fallback message
|
||||
}
|
||||
throw new Error(message);
|
||||
}
|
||||
|
||||
if (!response.body) {
|
||||
throw new Error("No response stream");
|
||||
}
|
||||
|
||||
const reader = response.body.getReader();
|
||||
const decoder = new TextDecoder();
|
||||
let buffer = "";
|
||||
let eventName = "message";
|
||||
let dataLines: string[] = [];
|
||||
|
||||
const flushEvent = () => {
|
||||
if (!dataLines.length) {
|
||||
eventName = "message";
|
||||
return;
|
||||
}
|
||||
|
||||
const dataText = dataLines.join("\n");
|
||||
let payload: any = null;
|
||||
try {
|
||||
payload = JSON.parse(dataText);
|
||||
} catch {
|
||||
payload = { message: dataText };
|
||||
}
|
||||
|
||||
if (eventName === "meta") handlers.onMeta?.(payload);
|
||||
else if (eventName === "delta") handlers.onDelta?.(payload);
|
||||
else if (eventName === "done") handlers.onDone?.(payload);
|
||||
else if (eventName === "error") handlers.onError?.(payload);
|
||||
|
||||
dataLines = [];
|
||||
eventName = "message";
|
||||
};
|
||||
|
||||
while (true) {
|
||||
const { value, done } = await reader.read();
|
||||
if (done) break;
|
||||
|
||||
buffer += decoder.decode(value, { stream: true });
|
||||
let newlineIndex = buffer.indexOf("\n");
|
||||
|
||||
while (newlineIndex >= 0) {
|
||||
const rawLine = buffer.slice(0, newlineIndex);
|
||||
buffer = buffer.slice(newlineIndex + 1);
|
||||
const line = rawLine.endsWith("\r") ? rawLine.slice(0, -1) : rawLine;
|
||||
|
||||
if (!line) {
|
||||
flushEvent();
|
||||
} else if (line.startsWith("event:")) {
|
||||
eventName = line.slice("event:".length).trim();
|
||||
} else if (line.startsWith("data:")) {
|
||||
dataLines.push(line.slice("data:".length).trimStart());
|
||||
}
|
||||
|
||||
newlineIndex = buffer.indexOf("\n");
|
||||
}
|
||||
}
|
||||
|
||||
buffer += decoder.decode();
|
||||
if (buffer.length) {
|
||||
const line = buffer.endsWith("\r") ? buffer.slice(0, -1) : buffer;
|
||||
if (line.startsWith("event:")) {
|
||||
eventName = line.slice("event:".length).trim();
|
||||
} else if (line.startsWith("data:")) {
|
||||
dataLines.push(line.slice("data:".length).trimStart());
|
||||
}
|
||||
}
|
||||
flushEvent();
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user