streaming response

This commit is contained in:
Avraham Sakal
2025-09-14 13:36:43 -04:00
parent 1dddae6a05
commit ebfbb22525
4 changed files with 322 additions and 238 deletions
+104 -80
View File
@@ -19,10 +19,20 @@ import {
import { usePageContext } from "vike-react/usePageContext"; import { usePageContext } from "vike-react/usePageContext";
import { useData } from "vike-react/useData"; import { useData } from "vike-react/useData";
import type { Data } from "./+data"; import type { Data } from "./+data";
import type { CommittedMessage, DraftMessage } from "../../../types"; import type {
CommittedMessage,
DraftMessage,
OtherParameters,
} from "../../../types";
import Markdown from "react-markdown"; import Markdown from "react-markdown";
import { IconTrash, IconEdit, IconCheck, IconX } from "@tabler/icons-react"; import {
import { useTRPC } from "../../../trpc/client"; IconTrash,
IconEdit,
IconCheck,
IconX,
IconLoaderQuarter,
} from "@tabler/icons-react";
import { useTRPC, useTRPCClient } from "../../../trpc/client";
import { useMutation, useQuery, useQueryClient } from "@tanstack/react-query"; import { useMutation, useQuery, useQueryClient } from "@tanstack/react-query";
import { nanoid } from "nanoid"; import { nanoid } from "nanoid";
import type { Conversation } from "../../../database/common"; import type { Conversation } from "../../../database/common";
@@ -49,6 +59,7 @@ export default function ChatPage() {
const setParameters = useStore((state) => state.setParameters); const setParameters = useStore((state) => state.setParameters);
const setLoading = useStore((state) => state.setLoading); const setLoading = useStore((state) => state.setLoading);
const trpc = useTRPC(); const trpc = useTRPC();
const trpcClient = useTRPCClient();
const queryClient = useQueryClient(); const queryClient = useQueryClient();
const messagesResult = useQuery( const messagesResult = useQuery(
@@ -334,83 +345,94 @@ export default function ChatPage() {
}) })
); );
const sendMessage = useMutation( // Get state from Zustand store
trpc.chat.sendMessage.mutationOptions({ const sendMessageStatus = useStore((state) => state.sendMessageStatus);
onMutate: async ({ const isSendingMessage = useStore((state) => state.isSendingMessage);
conversationId, const setSendMessageStatus = useStore((state) => state.setSendMessageStatus);
messages, const setIsSendingMessage = useStore((state) => state.setIsSendingMessage);
systemPrompt,
parameters, // Function to send message using subscription
}) => { const sendSubscriptionMessage = async ({
/** Cancel affected queries that may be in-flight: */ conversationId,
await queryClient.cancelQueries({ messages,
queryKey: trpc.chat.messages.fetchByConversationId.queryKey({ systemPrompt,
conversationId, parameters,
}), }: {
}); conversationId: string;
/** Optimistically update the affected queries in react-query's cache: */ messages: Array<DraftMessage | CommittedMessage>;
const previousMessages: Array<CommittedMessage> | undefined = systemPrompt: string;
await queryClient.getQueryData( parameters: OtherParameters;
trpc.chat.messages.fetchByConversationId.queryKey({ }) => {
conversationId, setIsSendingMessage(true);
}) setSendMessageStatus(null);
);
if (!previousMessages) { try {
return { // Create an abort controller for the subscription
previousMessages: [], const abortController = new AbortController();
newMessages: [],
}; // Start the subscription
const subscription = trpcClient.chat.sendMessage.subscribe(
{
conversationId,
messages,
systemPrompt,
parameters,
},
{
signal: abortController.signal,
onData: (data) => {
setSendMessageStatus(data);
// If we've completed, update the UI and invalidate queries
if (data.status === "completed") {
setIsSendingMessage(false);
// Invalidate queries to refresh the data
queryClient.invalidateQueries({
queryKey: trpc.chat.messages.fetchByConversationId.queryKey({
conversationId,
}),
});
queryClient.invalidateQueries({
queryKey: trpc.chat.facts.fetchByConversationId.queryKey({
conversationId,
}),
});
queryClient.invalidateQueries({
queryKey: trpc.chat.factTriggers.fetchByConversationId.queryKey(
{
conversationId,
}
),
});
} else {
setSendMessageStatus(data);
}
},
onError: (error) => {
console.error("Subscription error:", error);
setIsSendingMessage(false);
setSendMessageStatus({
status: "error",
message: "An error occurred while sending the message",
});
},
} }
const newMessages: Array<CommittedMessage> = [ );
...previousMessages,
{ // Return a function to unsubscribe if needed
/** placeholder id; will be overwritten when we get the true id from the backend */ return () => {
id: nanoid(), abortController.abort();
conversationId, subscription.unsubscribe();
// content: messages[messages.length - 1].content, };
// role: "user" as const, } catch (error) {
...messages[messages.length - 1], console.error("Failed to start subscription:", error);
index: previousMessages.length, setIsSendingMessage(false);
createdAt: new Date().toISOString(), setSendMessageStatus({
} as CommittedMessage, status: "error",
]; message: "Failed to start message sending process",
queryClient.setQueryData( });
trpc.chat.messages.fetchByConversationId.queryKey({ }
conversationId, };
}),
newMessages
);
return { previousMessages, newMessages };
},
onSettled: async (data, variables, context) => {
await queryClient.invalidateQueries({
queryKey: trpc.chat.messages.fetchByConversationId.queryKey({
conversationId,
}),
});
await queryClient.invalidateQueries({
queryKey: trpc.chat.facts.fetchByConversationId.queryKey({
conversationId,
}),
});
await queryClient.invalidateQueries({
queryKey: trpc.chat.factTriggers.fetchByConversationId.queryKey({
conversationId,
}),
});
},
onError: async (error, variables, context) => {
console.error(error);
if (!context) return;
queryClient.setQueryData(
trpc.chat.messages.fetchByConversationId.queryKey({
conversationId,
}),
context.previousMessages
);
},
})
);
// State for editing facts // State for editing facts
const [editingFactId, setEditingFactId] = useState<string | null>(null); const [editingFactId, setEditingFactId] = useState<string | null>(null);
@@ -483,6 +505,8 @@ export default function ChatPage() {
}); });
}} }}
/> />
{isSendingMessage && <IconLoaderQuarter size={16} stroke={1.5} />}
{sendMessageStatus && <span>{sendMessageStatus.message}</span>}
</div> </div>
<Tabs defaultValue="message"> <Tabs defaultValue="message">
<Tabs.List> <Tabs.List>
@@ -504,7 +528,7 @@ export default function ChatPage() {
if (e.key === "Enter") { if (e.key === "Enter") {
e.preventDefault(); e.preventDefault();
setLoading(true); setLoading(true);
await sendMessage.mutateAsync({ await sendSubscriptionMessage({
conversationId, conversationId,
messages: [ messages: [
...(messages || []), ...(messages || []),
+198 -158
View File
@@ -74,173 +74,213 @@ export const chat = router({
parameters: OtherParameters; parameters: OtherParameters;
} }
) )
.mutation( .subscription(async function* ({
async ({ input: { conversationId, messages, systemPrompt, parameters },
input: { conversationId, messages, systemPrompt, parameters }, }) {
}) => { /** TODO: Save all unsaved messages (i.e. those without an `id`) to the
/** TODO: Save all unsaved messages (i.e. those without an `id`) to the * database. Is this dangerous? Can an attacker just send a bunch of
* database. Is this dangerous? Can an attacker just send a bunch of * messages, omitting the ids, causing me to save a bunch of them to the
* messages, omitting the ids, causing me to save a bunch of them to the * database? I guess it's no worse than starting new converations, which
* database? I guess it's no worse than starting new converations, which * anyone can freely do. */
* anyone can freely do. */ const previousRunningSummaryIndex = messages.findLastIndex(
const previousRunningSummaryIndex = messages.findLastIndex( (message) =>
(message) => typeof (message as CommittedMessage).runningSummary !== "undefined"
typeof (message as CommittedMessage).runningSummary !== "undefined" );
); const previousRunningSummary =
const previousRunningSummary = previousRunningSummaryIndex >= 0
previousRunningSummaryIndex >= 0 ? ((messages[previousRunningSummaryIndex] as CommittedMessage)
? ((messages[previousRunningSummaryIndex] as CommittedMessage) .runningSummary as string)
.runningSummary as string) : "";
: ""; const messagesSincePreviousRunningSummary = messages.slice(
const messagesSincePreviousRunningSummary = messages.slice( previousRunningSummaryIndex + 1
previousRunningSummaryIndex + 1 );
);
/** Save the incoming message to the database. */
const insertedUserMessage = await db.messages.create({
conversationId,
// content: messages[messages.length - 1].content,
// role: "user" as const,
...messages[messages.length - 1],
index: messages.length - 1,
createdAt: new Date().toISOString(),
});
/** Generate a new message from the model, but hold-off on adding it to // Emit status update
* the database until we produce the associated running-summary, below. yield {
* The model should be given the conversation summary thus far, and of status: "saving_user_message",
* course the user's latest message, unmodified. Invite the model to message: "Saving user message...",
* create any tools it needs. The tool needs to be implemented in a } as const;
* language which this system can execute; usually an interpretted
* language like Python or JavaScript. */
const mainResponse = await generateText({
model: openrouter(MODEL_NAME),
messages: [
previousRunningSummary === ""
? {
role: "system" as const,
content: systemPrompt,
}
: {
role: "system" as const,
content: mainSystemPrompt({
systemPrompt,
previousRunningSummary,
}),
},
...messagesSincePreviousRunningSummary.map((m) => ({
role: m.role,
content: m.parts
.filter((p) => p.type === "text")
.map((p) => p.text)
.join(""),
})),
],
tools: undefined,
...parameters,
});
/** Extract Facts from the user's message, and add them to the database,
* linking the Facts with the messages they came from. (Yes, this should
* be done *after* the model response, not before; because when we run a
* query to find Facts to inject into the context sent to the model, we
* don't want Facts from the user's current message to be candidates for
* injection, because we're sending the user's message unadulterated to
* the model; there's no reason to inject the same Facts that the model is
* already using to generate its response.) */
const factsFromUserMessageResponse =
await factsCaller.extractFromNewMessages({
previousRunningSummary,
messagesSincePreviousRunningSummary: [],
newMessages: messagesSincePreviousRunningSummary,
});
const insertedFactsFromUserMessage = await db.facts.createMany(
factsFromUserMessageResponse.object.facts.map((fact) => ({
userId: "019900bb-61b3-7333-b760-b27784dfe33b",
sourceMessageId: insertedUserMessage.id,
content: fact,
}))
);
/** Produce a running summary of the conversation, and save that along /** Save the incoming message to the database. */
* with the model's response to the database. The new running summary is const insertedUserMessage = await db.messages.create({
* based on the previous running summary combined with the all messages conversationId,
* since that summary was produced. */ // content: messages[messages.length - 1].content,
const runningSummaryResponse = // role: "user" as const,
await messagesCaller.generateRunningSummary({ ...messages[messages.length - 1],
messagesSincePreviousRunningSummary, index: messages.length - 1,
mainResponseContent: mainResponse.text, createdAt: new Date().toISOString(),
previousRunningSummary, });
});
const insertedAssistantMessage = await db.messages.create({ // Emit status update
conversationId, yield {
// content: mainResponse.text, status: "generating_response",
parts: [{ type: "text", text: mainResponse.text }], message: "Generating AI response...",
runningSummary: runningSummaryResponse.text, } as const;
role: "assistant" as const,
index: messages.length, /** Generate a new message from the model, but hold-off on adding it to
createdAt: new Date().toISOString(), * the database until we produce the associated running-summary, below.
}); * The model should be given the conversation summary thus far, and of
/** Extract Facts from the model's response, and add them to the database, * course the user's latest message, unmodified. Invite the model to
* linking the Facts with the messages they came from. */ * create any tools it needs. The tool needs to be implemented in a
const factsFromAssistantMessageResponse = * language which this system can execute; usually an interpretted
await factsCaller.extractFromNewMessages({ * language like Python or JavaScript. */
previousRunningSummary, const mainResponse = await generateText({
messagesSincePreviousRunningSummary, model: openrouter(MODEL_NAME),
newMessages: [ messages: [
{ previousRunningSummary === ""
role: "assistant" as const, ? {
// content: mainResponse.text, role: "system" as const,
parts: [{ type: "text", text: mainResponse.text }], content: systemPrompt,
}
: {
role: "system" as const,
content: mainSystemPrompt({
systemPrompt,
previousRunningSummary,
}),
}, },
], ...messagesSincePreviousRunningSummary.map((m) => ({
}); role: m.role,
content: m.parts
.filter((p) => p.type === "text")
.map((p) => p.text)
.join(""),
})),
],
tools: undefined,
...parameters,
});
const insertedFactsFromAssistantMessage = await db.facts.createMany( // Emit status update
factsFromAssistantMessageResponse.object.facts.map((factContent) => ({ yield {
userId: "019900bb-61b3-7333-b760-b27784dfe33b", status: "extracting_facts_from_user",
sourceMessageId: insertedAssistantMessage.id, message: "Extracting facts from user message...",
content: factContent, } as const;
/** Extract Facts from the user's message, and add them to the database,
* linking the Facts with the messages they came from. (Yes, this should
* be done *after* the model response, not before; because when we run a
* query to find Facts to inject into the context sent to the model, we
* don't want Facts from the user's current message to be candidates for
* injection, because we're sending the user's message unadulterated to
* the model; there's no reason to inject the same Facts that the model is
* already using to generate its response.) */
const factsFromUserMessageResponse =
await factsCaller.extractFromNewMessages({
previousRunningSummary,
messagesSincePreviousRunningSummary: [],
newMessages: messagesSincePreviousRunningSummary,
});
const insertedFactsFromUserMessage = await db.facts.createMany(
factsFromUserMessageResponse.object.facts.map((fact) => ({
userId: "019900bb-61b3-7333-b760-b27784dfe33b",
sourceMessageId: insertedUserMessage.id,
content: fact,
}))
);
// Emit status update
yield {
status: "generating_summary",
message: "Generating conversation summary...",
} as const;
/** Produce a running summary of the conversation, and save that along
* with the model's response to the database. The new running summary is
* based on the previous running summary combined with the all messages
* since that summary was produced. */
const runningSummaryResponse =
await messagesCaller.generateRunningSummary({
messagesSincePreviousRunningSummary,
mainResponseContent: mainResponse.text,
previousRunningSummary,
});
const insertedAssistantMessage = await db.messages.create({
conversationId,
// content: mainResponse.text,
parts: [{ type: "text", text: mainResponse.text }],
runningSummary: runningSummaryResponse.text,
role: "assistant" as const,
index: messages.length,
createdAt: new Date().toISOString(),
});
// Emit status update
yield {
status: "extracting_facts_from_assistant",
message: "Extracting facts from assistant response...",
} as const;
/** Extract Facts from the model's response, and add them to the database,
* linking the Facts with the messages they came from. */
const factsFromAssistantMessageResponse =
await factsCaller.extractFromNewMessages({
previousRunningSummary,
messagesSincePreviousRunningSummary,
newMessages: [
{
role: "assistant" as const,
// content: mainResponse.text,
parts: [{ type: "text", text: mainResponse.text }],
},
],
});
const insertedFactsFromAssistantMessage = await db.facts.createMany(
factsFromAssistantMessageResponse.object.facts.map((factContent) => ({
userId: "019900bb-61b3-7333-b760-b27784dfe33b",
sourceMessageId: insertedAssistantMessage.id,
content: factContent,
createdAt: new Date().toISOString(),
}))
);
const insertedFacts = [
...insertedFactsFromUserMessage,
...insertedFactsFromAssistantMessage,
];
// Emit status update
yield {
status: "generating_fact_triggers",
message: "Generating fact triggers...",
} as const;
/** For each Fact produced in the two fact-extraction steps, generate
* FactTriggers and add them to the database, linking the FactTriggers
* with the Facts they came from. A FactTrigger is a natural language
* phrase that describes a situation in which it would be useful to invoke
* the Fact. (e.g., "When food preferences are discussed"). */
for (const fact of insertedFacts) {
const factTriggers = await factTriggerCaller.generateFromFact({
mainResponseContent: mainResponse.text,
previousRunningSummary,
messagesSincePreviousRunningSummary,
fact,
});
const insertedFactTriggers: Array<Omit<FactTrigger, "id">> =
factTriggers.object.factTriggers.map((factTrigger) => ({
sourceFactId: fact.id,
content: factTrigger,
priorityMultiplier: 1,
priorityMultiplierReason: "",
scopeConversationId: conversationId,
createdAt: new Date().toISOString(), createdAt: new Date().toISOString(),
})) }));
); await db.factTriggers.createMany(insertedFactTriggers);
}
const insertedFacts = [ // Emit final result
...insertedFactsFromUserMessage, yield {
...insertedFactsFromAssistantMessage, status: "completed",
]; message: "Completed!",
result: {
/** For each Fact produced in the two fact-extraction steps, generate
* FactTriggers and add them to the database, linking the FactTriggers
* with the Facts they came from. A FactTrigger is a natural language
* phrase that describes a situation in which it would be useful to invoke
* the Fact. (e.g., "When food preferences are discussed"). */
for (const fact of insertedFacts) {
const factTriggers = await factTriggerCaller.generateFromFact({
mainResponseContent: mainResponse.text,
previousRunningSummary,
messagesSincePreviousRunningSummary,
fact,
});
const insertedFactTriggers: Array<Omit<FactTrigger, "id">> =
factTriggers.object.factTriggers.map((factTrigger) => ({
sourceFactId: fact.id,
content: factTrigger,
priorityMultiplier: 1,
priorityMultiplierReason: "",
scopeConversationId: conversationId,
createdAt: new Date().toISOString(),
}));
db.factTriggers.createMany(insertedFactTriggers);
}
// await db.write();
return {
insertedAssistantMessage, insertedAssistantMessage,
insertedUserMessage, insertedUserMessage,
insertedFacts, insertedFacts,
}; },
} } as const;
), }),
}); });
export const createCaller = createCallerFactory(chat); export const createCaller = createCallerFactory(chat);
+10
View File
@@ -19,6 +19,8 @@ export const useStore = create<Store>()(
facts: [], facts: [],
factTriggers: [], factTriggers: [],
loading: false, loading: false,
sendMessageStatus: null,
isSendingMessage: false,
setConversationId: (conversationId) => setConversationId: (conversationId) =>
set((stateDraft) => { set((stateDraft) => {
stateDraft.selectedConversationId = conversationId; stateDraft.selectedConversationId = conversationId;
@@ -92,5 +94,13 @@ export const useStore = create<Store>()(
set((stateDraft) => { set((stateDraft) => {
stateDraft.loading = loading; stateDraft.loading = loading;
}), }),
setSendMessageStatus: (status) =>
set((stateDraft) => {
stateDraft.sendMessageStatus = status;
}),
setIsSendingMessage: (isSending) =>
set((stateDraft) => {
stateDraft.isSendingMessage = isSending;
}),
})), })),
); );
+10
View File
@@ -9,6 +9,12 @@ export type OtherParameters = Omit<
export type ConversationUI = Conversation & {}; export type ConversationUI = Conversation & {};
export type SendMessageStatus = {
status: string;
message: string;
result?: any;
};
export type Store = { export type Store = {
/** This is a string because Milvus sends it as a string, and the value /** This is a string because Milvus sends it as a string, and the value
* overflows the JS integer anyway. */ * overflows the JS integer anyway. */
@@ -21,6 +27,8 @@ export type Store = {
facts: Array<Fact>; facts: Array<Fact>;
factTriggers: Array<FactTrigger>; factTriggers: Array<FactTrigger>;
loading: boolean; loading: boolean;
sendMessageStatus: SendMessageStatus | null;
isSendingMessage: boolean;
setConversationId: (conversationId: string) => void; setConversationId: (conversationId: string) => void;
setConversationTitle: (conversationTitle: string) => void; setConversationTitle: (conversationTitle: string) => void;
setConversations: (conversations: Array<ConversationUI>) => void; setConversations: (conversations: Array<ConversationUI>) => void;
@@ -35,6 +43,8 @@ export type Store = {
removeFact: (factId: string) => void; removeFact: (factId: string) => void;
removeFactTrigger: (factTriggerId: string) => void; removeFactTrigger: (factTriggerId: string) => void;
setLoading: (loading: boolean) => void; setLoading: (loading: boolean) => void;
setSendMessageStatus: (status: SendMessageStatus | null) => void;
setIsSendingMessage: (isSending: boolean) => void;
}; };
/** The message while it's being typed in the input box. */ /** The message while it's being typed in the input box. */