Files
ST-Bionic-Memory-Ecology/tests/llm-streaming.mjs
2026-04-11 22:23:18 +08:00

488 lines
13 KiB
JavaScript

import assert from "node:assert/strict";
import { createRequire } from "node:module";
import {
installResolveHooks,
toDataModuleUrl,
} from "./helpers/register-hooks-compat.mjs";
const extensionsShimSource = [
"export const extension_settings = globalThis.__llmStreamingExtensionSettings || {};",
"export function getContext() {",
" return null;",
"}",
].join("\n");
const scriptShimSource = [
"export function getRequestHeaders() {",
" return { 'Content-Type': 'application/json' };",
"}",
].join("\n");
const openAiShimSource = [
"export const chat_completion_sources = { CUSTOM: 'custom', OPENAI: 'openai' };",
"export async function sendOpenAIRequest(...args) {",
" if (typeof globalThis.__llmStreamingSendOpenAIRequest === 'function') {",
" return await globalThis.__llmStreamingSendOpenAIRequest(...args);",
" }",
" return { choices: [{ message: { content: '{}' } }] };",
"}",
].join("\n");
installResolveHooks([
{
specifiers: [
"../../../extensions.js",
"../../../../extensions.js",
"../../../../../extensions.js",
],
url: toDataModuleUrl(extensionsShimSource),
},
{
specifiers: [
"../../../../script.js",
"../../../../../script.js",
],
url: toDataModuleUrl(scriptShimSource),
},
{
specifiers: [
"../../../openai.js",
"../../../../openai.js",
],
url: toDataModuleUrl(openAiShimSource),
},
]);
const require = createRequire(import.meta.url);
const originalRequire = globalThis.require;
const originalExtensionSettings = globalThis.__llmStreamingExtensionSettings;
const originalSendOpenAIRequest = globalThis.__llmStreamingSendOpenAIRequest;
globalThis.__llmStreamingExtensionSettings = {
st_bme: {},
};
globalThis.require = require;
const { createDefaultTaskProfiles } = await import("../prompting/prompt-profiles.js");
const llm = await import("../llm/llm.js");
const extensionsApi = await import("../../../../extensions.js");
if (originalRequire === undefined) {
delete globalThis.require;
} else {
globalThis.require = originalRequire;
}
if (originalExtensionSettings === undefined) {
delete globalThis.__llmStreamingExtensionSettings;
} else {
globalThis.__llmStreamingExtensionSettings = originalExtensionSettings;
}
if (originalSendOpenAIRequest === undefined) {
delete globalThis.__llmStreamingSendOpenAIRequest;
} else {
globalThis.__llmStreamingSendOpenAIRequest = originalSendOpenAIRequest;
}
function buildStreamingSettings(generation = {}, overrides = {}) {
const taskProfiles = createDefaultTaskProfiles();
taskProfiles.extract.profiles[0].generation = {
...taskProfiles.extract.profiles[0].generation,
...generation,
};
return {
llmApiUrl: "https://example.com/v1",
llmApiKey: "sk-stream-secret",
llmModel: "gpt-stream-test",
timeoutMs: 1234,
taskProfilesVersion: 3,
taskProfiles,
...(overrides || {}),
};
}
function createSseResponse(events = [], status = 200) {
const encoder = new TextEncoder();
return new Response(
new ReadableStream({
start(controller) {
for (const event of events) {
const payload =
typeof event === "string" ? event : JSON.stringify(event);
controller.enqueue(encoder.encode(`data: ${payload}\n\n`));
}
controller.close();
},
}),
{
status,
headers: {
"Content-Type": "text/event-stream",
},
},
);
}
function getSnapshot(taskKey = "extract") {
return globalThis.__stBmeRuntimeDebugState?.taskLlmRequests?.[taskKey] || null;
}
async function withStreamingSettings(generation, run, overrides = {}) {
const previousSettings = JSON.parse(
JSON.stringify(extensionsApi.extension_settings.st_bme || {}),
);
extensionsApi.extension_settings.st_bme = {
...previousSettings,
...buildStreamingSettings(generation, overrides),
};
delete globalThis.__stBmeRuntimeDebugState;
try {
await run();
} finally {
extensionsApi.extension_settings.st_bme = previousSettings;
}
}
async function testDedicatedStreamingSuccess() {
const originalFetch = globalThis.fetch;
let fetchCount = 0;
globalThis.fetch = async () => {
fetchCount += 1;
return createSseResponse([
{ choices: [{ delta: { content: '{"ok":' } }] },
{ choices: [{ delta: { content: "true}" } }] },
{ choices: [{ finish_reason: "stop" }] },
"[DONE]",
]);
};
try {
await withStreamingSettings({ stream: true }, async () => {
const result = await llm.callLLMForJSON({
systemPrompt: "system",
userPrompt: "user",
maxRetries: 0,
taskType: "extract",
requestSource: "test:stream-success",
});
assert.deepEqual(result, { ok: true });
assert.equal(fetchCount, 1);
const snapshot = getSnapshot("extract");
assert.ok(snapshot);
assert.equal(snapshot.streamRequested, true);
assert.equal(snapshot.streamActive, false);
assert.equal(snapshot.streamCompleted, true);
assert.equal(snapshot.streamFallback, false);
assert.equal(snapshot.streamFallbackSucceeded, false);
assert.equal(snapshot.streamFinishReason, "stop");
assert.ok(snapshot.streamChunkCount >= 2);
assert.ok(snapshot.streamReceivedChars >= 10);
assert.match(snapshot.streamPreviewText, /\{"ok":true\}/);
assert.equal(snapshot.requestBody?.stream, true);
});
} finally {
globalThis.fetch = originalFetch;
}
}
async function testDedicatedStreamingFallsBackToNonStream() {
const originalFetch = globalThis.fetch;
let fetchCount = 0;
globalThis.fetch = async () => {
fetchCount += 1;
if (fetchCount === 1) {
return new Response(
JSON.stringify({
error: {
message: "Streaming is not supported by this provider",
},
}),
{
status: 400,
headers: {
"Content-Type": "application/json",
},
},
);
}
return new Response(
JSON.stringify({
choices: [
{
message: {
content: '{"ok":true}',
},
finish_reason: "stop",
},
],
}),
{
status: 200,
headers: {
"Content-Type": "application/json",
},
},
);
};
try {
await withStreamingSettings({ stream: true }, async () => {
const result = await llm.callLLMForJSON({
systemPrompt: "system",
userPrompt: "user",
maxRetries: 0,
taskType: "extract",
requestSource: "test:stream-fallback",
});
assert.deepEqual(result, { ok: true });
assert.equal(fetchCount, 2);
const snapshot = getSnapshot("extract");
assert.ok(snapshot);
assert.equal(snapshot.streamRequested, true);
assert.equal(snapshot.streamCompleted, false);
assert.equal(snapshot.streamFallback, true);
assert.equal(snapshot.streamFallbackSucceeded, true);
assert.match(snapshot.streamFallbackReason, /stream/i);
assert.equal(snapshot.requestBody?.stream, false);
assert.equal(snapshot.filteredGeneration?.stream, true);
assert.equal(snapshot.redacted, true);
assert.doesNotMatch(JSON.stringify(snapshot), /sk-stream-secret/);
});
} finally {
globalThis.fetch = originalFetch;
}
}
async function testDedicatedStreamingAbortDoesNotLeaveActiveState() {
const originalFetch = globalThis.fetch;
const encoder = new TextEncoder();
globalThis.fetch = async (_url, options = {}) => {
const signal = options.signal;
let readCount = 0;
return {
ok: true,
status: 200,
statusText: "OK",
body: {
getReader() {
return {
async read() {
if (readCount === 0) {
readCount += 1;
return {
done: false,
value: encoder.encode(
'data: {"choices":[{"delta":{"content":"{\\"partial\\":"}}]}\n\n',
),
};
}
return await new Promise((resolve, reject) => {
signal?.addEventListener(
"abort",
() =>
reject(
signal.reason ||
new DOMException("Aborted", "AbortError"),
),
{ once: true },
);
});
},
releaseLock() {},
};
},
},
text: async () => "",
};
};
try {
await withStreamingSettings({ stream: true }, async () => {
const controller = new AbortController();
const promise = llm.callLLMForJSON({
systemPrompt: "system",
userPrompt: "user",
maxRetries: 0,
taskType: "extract",
requestSource: "test:stream-abort",
signal: controller.signal,
});
await new Promise((resolve) => setTimeout(resolve, 0));
controller.abort(new DOMException("Aborted", "AbortError"));
await assert.rejects(
promise,
(error) => error?.name === "AbortError",
);
const snapshot = getSnapshot("extract");
assert.ok(snapshot);
assert.equal(snapshot.streamRequested, true);
assert.equal(snapshot.streamActive, false);
assert.equal(snapshot.streamCompleted, false);
assert.equal(snapshot.streamFallback, false);
assert.equal(snapshot.streamFinishReason, "aborted");
});
} finally {
globalThis.fetch = originalFetch;
}
}
async function testJsonRetryKeepsProfileCompletionTokens() {
const originalFetch = globalThis.fetch;
let fetchCount = 0;
globalThis.fetch = async () => {
fetchCount += 1;
if (fetchCount === 1) {
return new Response(
JSON.stringify({
choices: [
{
message: {
content: "not-json",
},
finish_reason: "stop",
},
],
}),
{
status: 200,
headers: {
"Content-Type": "application/json",
},
},
);
}
return new Response(
JSON.stringify({
choices: [
{
message: {
content: '{"ok":true}',
},
finish_reason: "stop",
},
],
}),
{
status: 200,
headers: {
"Content-Type": "application/json",
},
},
);
};
try {
await withStreamingSettings(
{
stream: false,
max_completion_tokens: 7777,
},
async () => {
const result = await llm.callLLMForJSON({
systemPrompt: "system",
userPrompt: "user",
maxRetries: 1,
taskType: "extract",
requestSource: "test:json-retry-keeps-profile-tokens",
});
assert.deepEqual(result, { ok: true });
assert.equal(fetchCount, 2);
const snapshot = getSnapshot("extract");
assert.ok(snapshot);
assert.equal(snapshot.requestBody?.max_tokens, 7777);
assert.equal(snapshot.requestBody?.max_completion_tokens, undefined);
assert.equal(snapshot.filteredGeneration?.max_completion_tokens, 7777);
},
);
} finally {
globalThis.fetch = originalFetch;
}
}
async function testAnthropicRouteUsesReverseProxyAndDisablesStreaming() {
const originalFetch = globalThis.fetch;
let requestBody = null;
globalThis.fetch = async (_url, options = {}) => {
requestBody = JSON.parse(String(options.body || "{}"));
return new Response(
JSON.stringify({
choices: [
{
message: {
content: '{"ok":true}',
},
finish_reason: "stop",
},
],
}),
{
status: 200,
headers: {
"Content-Type": "application/json",
},
},
);
};
try {
await withStreamingSettings(
{ stream: true },
async () => {
const result = await llm.callLLMForJSON({
systemPrompt: "system",
userPrompt: "user",
maxRetries: 0,
taskType: "extract",
requestSource: "test:anthropic-route",
});
assert.deepEqual(result, { ok: true });
assert.equal(requestBody?.chat_completion_source, "claude");
assert.equal(requestBody?.reverse_proxy, "https://api.anthropic.com/v1");
assert.equal(requestBody?.proxy_password, "sk-stream-secret");
assert.equal(requestBody?.stream, false);
assert.ok(requestBody?.json_schema);
const snapshot = getSnapshot("extract");
assert.ok(snapshot);
assert.equal(snapshot.route, "dedicated-anthropic-claude");
assert.equal(snapshot.llmProviderLabel, "Anthropic Claude");
assert.equal(snapshot.streamRequested, false);
assert.equal(snapshot.streamForceDisabled, true);
},
{
llmApiUrl: "https://api.anthropic.com/v1/messages",
llmModel: "claude-sonnet-4-5",
},
);
} finally {
globalThis.fetch = originalFetch;
}
}
await testDedicatedStreamingSuccess();
await testDedicatedStreamingFallsBackToNonStream();
await testDedicatedStreamingAbortDoesNotLeaveActiveState();
await testJsonRetryKeepsProfileCompletionTokens();
await testAnthropicRouteUsesReverseProxyAndDisablesStreaming();
console.log("llm-streaming tests passed");