From 62902c98678abf4e2d724b76732f18c51df5ed4d Mon Sep 17 00:00:00 2001 From: Youzini-afk <13153778771cx@gmail.com> Date: Fri, 27 Mar 2026 13:38:02 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E4=B8=93=E7=94=A8Memory=20LLM=E6=B5=81?= =?UTF-8?q?=E5=BC=8F=E5=BA=95=E5=B1=82+=E8=87=AA=E5=8A=A8=E9=99=8D?= =?UTF-8?q?=E7=BA=A7+=E8=BF=90=E8=A1=8C=E6=97=B6=E8=B0=83=E8=AF=95?= =?UTF-8?q?=E7=8A=B6=E6=80=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- llm.js | 526 +++++++++++++++++++++++++++++++++++++--- tests/llm-streaming.mjs | 342 ++++++++++++++++++++++++++ 2 files changed, 835 insertions(+), 33 deletions(-) create mode 100644 tests/llm-streaming.mjs diff --git a/llm.js b/llm.js index fc271f3..db7c7ee 100644 --- a/llm.js +++ b/llm.js @@ -13,6 +13,8 @@ const LLM_REQUEST_TIMEOUT_MS = 300000; const DEFAULT_TEXT_COMPLETION_TOKENS = 64000; const DEFAULT_JSON_COMPLETION_TOKENS = 64000; const RETRY_JSON_COMPLETION_TOKENS = 3200; +const STREAM_DEBUG_PREVIEW_MAX_CHARS = 1200; +const STREAM_DEBUG_UPDATE_INTERVAL_MS = 120; const SENSITIVE_DEBUG_KEY_PATTERN = /^(authorization|proxy_password|api[_-]?key|access[_-]?token|refresh[_-]?token|secret|password)$/i; @@ -79,6 +81,10 @@ function sanitizeLlmDebugSnapshot(snapshot = {}) { return redacted; } +function nowIso() { + return new Date().toISOString(); +} + function getRuntimeDebugState() { const stateKey = "__stBmeRuntimeDebugState"; if ( @@ -254,6 +260,204 @@ function buildPromptExecutionSummary(debugContext = null) { }; } +function createStreamDebugState({ + requested = false, + fallback = false, + fallbackReason = "", + fallbackSucceeded = false, +} = {}) { + return { + requested: Boolean(requested), + active: false, + completed: false, + fallback: Boolean(fallback), + fallbackReason: String(fallbackReason || ""), + fallbackSucceeded: Boolean(fallbackSucceeded), + startedAt: "", + finishedAt: "", + chunkCount: 0, + receivedChars: 0, + previewText: "", + finishReason: "", + lastEventAt: "", + lastDebugUpdateAt: 0, + }; +} + +function buildStreamDebugSnapshot(streamState = {}) { + return { + streamRequested: Boolean(streamState.requested), + streamActive: Boolean(streamState.active), + streamCompleted: Boolean(streamState.completed), + streamFallback: Boolean(streamState.fallback), + streamFallbackReason: String(streamState.fallbackReason || ""), + streamFallbackSucceeded: Boolean(streamState.fallbackSucceeded), + streamStartedAt: String(streamState.startedAt || ""), + streamFinishedAt: String(streamState.finishedAt || ""), + streamChunkCount: Number(streamState.chunkCount || 0), + streamReceivedChars: Number(streamState.receivedChars || 0), + streamPreviewText: String(streamState.previewText || ""), + streamFinishReason: String(streamState.finishReason || ""), + streamLastEventAt: String(streamState.lastEventAt || ""), + }; +} + +function recordTaskLlmStreamState( + taskKey, + streamState, + extraSnapshot = {}, + { force = false } = {}, +) { + if (!taskKey || !streamState) return; + + const now = Date.now(); + if ( + !force && + streamState.lastDebugUpdateAt && + now - streamState.lastDebugUpdateAt < STREAM_DEBUG_UPDATE_INTERVAL_MS + ) { + return; + } + + streamState.lastDebugUpdateAt = now; + recordTaskLlmRequest( + taskKey, + { + ...buildStreamDebugSnapshot(streamState), + ...extraSnapshot, + }, + { + merge: true, + }, + ); +} + +function appendStreamPreview(existingPreview = "", deltaText = "") { + const combined = `${String(existingPreview || "")}${String(deltaText || "")}`; + if (combined.length <= STREAM_DEBUG_PREVIEW_MAX_CHARS) { + return combined; + } + return combined.slice(-STREAM_DEBUG_PREVIEW_MAX_CHARS); +} + +function extractTextLikeValue(value) { + if (value == null) return ""; + if (typeof value === "string") return value; + if (typeof value === "number" || typeof value === "boolean") { + return String(value); + } + if (Array.isArray(value)) { + return value + .map((item) => + extractTextLikeValue(item?.text ?? item?.content ?? item), + ) + .join(""); + } + if (typeof value === "object") { + return extractTextLikeValue(value.text ?? value.content ?? ""); + } + return ""; +} + +function extractStreamingChoice(payload = {}) { + return payload?.choices?.[0] || {}; +} + +function extractStreamingContentDelta(payload = {}) { + const choice = extractStreamingChoice(payload); + return extractTextLikeValue( + choice?.delta?.content ?? + choice?.message?.content ?? + choice?.text ?? + payload?.content ?? + payload?.text ?? + "", + ); +} + +function extractStreamingReasoningDelta(payload = {}) { + const choice = extractStreamingChoice(payload); + return extractTextLikeValue( + choice?.delta?.reasoning_content ?? + choice?.delta?.reasoning ?? + choice?.message?.reasoning_content ?? + payload?.reasoning ?? + "", + ); +} + +function extractStreamingFinishReason(payload = {}) { + const choice = extractStreamingChoice(payload); + return String( + choice?.finish_reason ?? + payload?.finish_reason ?? + payload?.stop_reason ?? + "", + ); +} + +function extractErrorMessageFromPayload(payload = {}) { + if (typeof payload === "string") { + return payload; + } + return String( + payload?.error?.message ?? + payload?.message ?? + payload?.detail ?? + payload?.error ?? + "", + ).trim(); +} + +function looksLikeJsonModeUnsupportedMessage(message = "") { + return /(response_format|json[_-\s]?mode|json[_-\s]?object|json schema|structured output)/i.test( + String(message || ""), + ); +} + +function looksLikeStreamUnsupportedMessage(message = "") { + return /(stream|streaming|sse|event[-\s]?stream|text\/event-stream)/i.test( + String(message || ""), + ); +} + +function createStreamHandlingError( + message, + code = "stream_error", + options = {}, +) { + const error = new Error(String(message || "流式请求失败")); + error.name = "StreamHandlingError"; + error.code = code; + error.fallbackable = options?.fallbackable !== false; + error.status = Number.isFinite(Number(options?.status)) + ? Number(options.status) + : 0; + return error; +} + +function isStreamHandlingError(error) { + return error?.name === "StreamHandlingError"; +} + +function shouldFallbackToNonStream(error) { + return isStreamHandlingError(error) && error?.fallbackable !== false; +} + +function buildResponseErrorMessage(response, responseText = "") { + const rawText = String(responseText || "").trim(); + if (!rawText) { + return String(response?.statusText || ""); + } + + try { + const parsed = JSON.parse(rawText); + return extractErrorMessageFromPayload(parsed) || rawText; + } catch { + return rawText; + } +} + function normalizeOpenAICompatibleBaseUrl(value) { return String(value || "") .trim() @@ -630,6 +834,243 @@ function isAbortError(error) { return error?.name === "AbortError"; } +async function parseDedicatedStreamingResponse( + response, + { taskKey = "", streamState = null } = {}, +) { + const reader = response?.body?.getReader?.(); + if (!reader) { + throw createStreamHandlingError( + "专用 LLM 返回的响应体不可流式读取", + "missing_stream_body", + ); + } + + const decoder = new TextDecoder(); + let buffer = ""; + let content = ""; + let reasoningContent = ""; + let finishReason = ""; + let sawStreamEvent = false; + + streamState.active = true; + streamState.completed = false; + streamState.startedAt = streamState.startedAt || nowIso(); + streamState.finishedAt = ""; + recordTaskLlmStreamState(taskKey, streamState, {}, { force: true }); + + try { + while (true) { + const { done, value } = await reader.read(); + if (done) { + break; + } + + buffer += decoder.decode(value, { stream: true }).replace(/\r\n/g, "\n"); + + while (true) { + const boundaryIndex = buffer.indexOf("\n\n"); + if (boundaryIndex < 0) { + break; + } + + const eventBlock = buffer.slice(0, boundaryIndex).trim(); + buffer = buffer.slice(boundaryIndex + 2); + if (!eventBlock) { + continue; + } + + const dataLines = eventBlock + .split("\n") + .filter((line) => line.startsWith("data:")) + .map((line) => line.slice(5).trimStart()); + if (!dataLines.length) { + continue; + } + + const rawData = dataLines.join("\n").trim(); + if (!rawData) { + continue; + } + if (rawData === "[DONE]") { + sawStreamEvent = true; + streamState.lastEventAt = nowIso(); + break; + } + + let parsed; + try { + parsed = JSON.parse(rawData); + } catch (error) { + throw createStreamHandlingError( + "专用 LLM 返回了无法解析的 SSE 数据块", + "invalid_sse_chunk", + { + fallbackable: true, + }, + ); + } + + const payloadErrorMessage = extractErrorMessageFromPayload(parsed); + if (payloadErrorMessage) { + throw createStreamHandlingError( + payloadErrorMessage, + "stream_payload_error", + { + fallbackable: + looksLikeStreamUnsupportedMessage(payloadErrorMessage), + }, + ); + } + + sawStreamEvent = true; + streamState.chunkCount += 1; + streamState.lastEventAt = nowIso(); + + const deltaText = extractStreamingContentDelta(parsed); + const reasoningDelta = extractStreamingReasoningDelta(parsed); + const nextFinishReason = extractStreamingFinishReason(parsed); + + if (deltaText) { + content += deltaText; + streamState.receivedChars += deltaText.length; + streamState.previewText = appendStreamPreview( + streamState.previewText, + deltaText, + ); + } + + if (reasoningDelta) { + reasoningContent += reasoningDelta; + } + + if (nextFinishReason) { + finishReason = nextFinishReason; + streamState.finishReason = nextFinishReason; + } + + recordTaskLlmStreamState(taskKey, streamState, {}); + } + } + + buffer += decoder.decode(); + if (!sawStreamEvent) { + throw createStreamHandlingError( + "专用 LLM 未返回可识别的 SSE 事件流", + "invalid_sse_stream", + ); + } + + streamState.active = false; + streamState.completed = true; + streamState.finishedAt = nowIso(); + if (finishReason) { + streamState.finishReason = finishReason; + } + recordTaskLlmStreamState(taskKey, streamState, {}, { force: true }); + + return { + content: String(content || "").trim(), + finishReason: String(finishReason || ""), + reasoningContent: String(reasoningContent || ""), + raw: { + mode: "stream", + chunkCount: streamState.chunkCount, + }, + }; + } catch (error) { + streamState.active = false; + streamState.completed = false; + streamState.finishedAt = nowIso(); + if (isAbortError(error)) { + streamState.finishReason = "aborted"; + } + recordTaskLlmStreamState(taskKey, streamState, {}, { force: true }); + throw error; + } finally { + try { + reader.releaseLock?.(); + } catch { + // ignore + } + } +} + +async function executeDedicatedRequest( + body, + { + signal, + timeoutMs = LLM_REQUEST_TIMEOUT_MS, + jsonMode = false, + taskKey = "", + streamState = null, + } = {}, +) { + const requestBody = cloneRuntimeDebugValue(body, {}) || {}; + + while (true) { + recordTaskLlmRequest( + taskKey, + { + requestBody: requestBody, + }, + { + merge: true, + }, + ); + + const response = await fetchWithTimeout( + "/api/backends/chat-completions/generate", + { + method: "POST", + headers: getRequestHeaders(), + body: JSON.stringify(requestBody), + signal, + }, + timeoutMs, + ); + + if (!response.ok) { + const responseText = await response.text().catch(() => ""); + const message = buildResponseErrorMessage(response, responseText); + if ( + jsonMode && + _jsonModeSupported && + response.status === 400 && + looksLikeJsonModeUnsupportedMessage(message) + ) { + console.warn("[ST-BME] API 不支持 json mode,降级为普通 JSON 提示模式"); + _jsonModeSupported = false; + delete requestBody.custom_include_body; + continue; + } + + if (requestBody.stream === true && looksLikeStreamUnsupportedMessage(message)) { + throw createStreamHandlingError( + message || `Memory LLM proxy error ${response.status}`, + "stream_http_error", + { + status: response.status, + }, + ); + } + + throw new Error( + `Memory LLM proxy error ${response.status}: ${message || response.statusText}`, + ); + } + + if (requestBody.stream === true) { + return await parseDedicatedStreamingResponse(response, { + taskKey, + streamState, + }); + } + + return await _parseResponse(response); + } +} + async function callDedicatedOpenAICompatible( messages, { @@ -663,6 +1104,13 @@ async function callDedicatedOpenAICompatible( filtered: {}, removed: [], }; + const taskKey = taskType || privateRequestSource; + const initialFilteredGeneration = generationResolved.filtered || {}; + const streamRequested = + hasDedicatedConfig && initialFilteredGeneration.stream === true; + const streamState = createStreamDebugState({ + requested: streamRequested, + }); recordTaskLlmRequest(taskType || privateRequestSource, { requestSource: privateRequestSource, taskType: String(taskType || "").trim(), @@ -684,6 +1132,7 @@ async function callDedicatedOpenAICompatible( taskType, ), maxCompletionTokens, + ...buildStreamDebugSnapshot(streamState), }); if (!hasDedicatedConfig) { const payload = await sendOpenAIRequest( @@ -759,7 +1208,7 @@ async function callDedicatedOpenAICompatible( body[field] = filteredGeneration[field]; } - if (jsonMode) { + if (jsonMode && _jsonModeSupported) { body.custom_include_body = buildYamlObject({ response_format: { type: "json_object", @@ -767,7 +1216,7 @@ async function callDedicatedOpenAICompatible( }); } - recordTaskLlmRequest(taskType || privateRequestSource, { + recordTaskLlmRequest(taskKey, { requestSource: privateRequestSource, taskType: String(taskType || "").trim(), jsonMode, @@ -787,43 +1236,54 @@ async function callDedicatedOpenAICompatible( taskType, ), requestBody: body, + ...buildStreamDebugSnapshot(streamState), }); - const response = await fetchWithTimeout( - "/api/backends/chat-completions/generate", - { - method: "POST", - headers: getRequestHeaders(), - body: JSON.stringify(body), + try { + return await executeDedicatedRequest(body, { signal, - }, - config.timeoutMs, - ); + timeoutMs: config.timeoutMs, + jsonMode, + taskKey, + streamState, + }); + } catch (error) { + if ( + !streamRequested || + !shouldFallbackToNonStream(error) || + isAbortError(error) + ) { + throw error; + } - // 如果 400 且带了 structured output,可能是 API 不支持,降级重试 - if ( - !response.ok && - response.status === 400 && - jsonMode && - _jsonModeSupported - ) { - console.warn("[ST-BME] API 不支持 json mode,降级为普通 JSON 提示模式"); - _jsonModeSupported = false; - delete body.custom_include_body; - const retryResponse = await fetchWithTimeout( - "/api/backends/chat-completions/generate", - { - method: "POST", - headers: getRequestHeaders(), - body: JSON.stringify(body), - signal, - }, - config.timeoutMs, + streamState.active = false; + streamState.completed = false; + streamState.fallback = true; + streamState.fallbackReason = error?.message || String(error); + streamState.finishedAt = nowIso(); + recordTaskLlmStreamState(taskKey, streamState, {}, { force: true }); + + console.warn( + `[ST-BME] 专用 LLM 流式不可用,已自动降级为非流式: ${streamState.fallbackReason}`, ); - return await _parseResponse(retryResponse); - } - return await _parseResponse(response); + const fallbackBody = { + ...body, + stream: false, + }; + + const fallbackResponse = await executeDedicatedRequest(fallbackBody, { + signal, + timeoutMs: config.timeoutMs, + jsonMode, + taskKey, + streamState, + }); + + streamState.fallbackSucceeded = true; + recordTaskLlmStreamState(taskKey, streamState, {}, { force: true }); + return fallbackResponse; + } } async function _parseResponse(response) { diff --git a/tests/llm-streaming.mjs b/tests/llm-streaming.mjs new file mode 100644 index 0000000..f4a3f0a --- /dev/null +++ b/tests/llm-streaming.mjs @@ -0,0 +1,342 @@ +import assert from "node:assert/strict"; +import { createRequire, registerHooks } from "node:module"; + +const extensionsShimSource = [ + "export const extension_settings = globalThis.__llmStreamingExtensionSettings || {};", + "export function getContext() {", + " return null;", + "}", +].join("\n"); +const scriptShimSource = [ + "export function getRequestHeaders() {", + " return { 'Content-Type': 'application/json' };", + "}", +].join("\n"); +const openAiShimSource = [ + "export const chat_completion_sources = { CUSTOM: 'custom', OPENAI: 'openai' };", + "export async function sendOpenAIRequest(...args) {", + " if (typeof globalThis.__llmStreamingSendOpenAIRequest === 'function') {", + " return await globalThis.__llmStreamingSendOpenAIRequest(...args);", + " }", + " return { choices: [{ message: { content: '{}' } }] };", + "}", +].join("\n"); + +registerHooks({ + resolve(specifier, context, nextResolve) { + if ( + specifier === "../../../extensions.js" || + specifier === "../../../../extensions.js" + ) { + return { + shortCircuit: true, + url: `data:text/javascript,${encodeURIComponent(extensionsShimSource)}`, + }; + } + if (specifier === "../../../../script.js") { + return { + shortCircuit: true, + url: `data:text/javascript,${encodeURIComponent(scriptShimSource)}`, + }; + } + if (specifier === "../../../openai.js") { + return { + shortCircuit: true, + url: `data:text/javascript,${encodeURIComponent(openAiShimSource)}`, + }; + } + return nextResolve(specifier, context); + }, +}); + +const require = createRequire(import.meta.url); +const originalRequire = globalThis.require; +const originalExtensionSettings = globalThis.__llmStreamingExtensionSettings; +const originalSendOpenAIRequest = globalThis.__llmStreamingSendOpenAIRequest; + +globalThis.__llmStreamingExtensionSettings = { + st_bme: {}, +}; +globalThis.require = require; + +const { createDefaultTaskProfiles } = await import("../prompt-profiles.js"); +const llm = await import("../llm.js"); +const extensionsApi = await import("../../../../extensions.js"); + +if (originalRequire === undefined) { + delete globalThis.require; +} else { + globalThis.require = originalRequire; +} + +if (originalExtensionSettings === undefined) { + delete globalThis.__llmStreamingExtensionSettings; +} else { + globalThis.__llmStreamingExtensionSettings = originalExtensionSettings; +} + +if (originalSendOpenAIRequest === undefined) { + delete globalThis.__llmStreamingSendOpenAIRequest; +} else { + globalThis.__llmStreamingSendOpenAIRequest = originalSendOpenAIRequest; +} + +function buildStreamingSettings(generation = {}) { + const taskProfiles = createDefaultTaskProfiles(); + taskProfiles.extract.profiles[0].generation = { + ...taskProfiles.extract.profiles[0].generation, + ...generation, + }; + return { + llmApiUrl: "https://example.com/v1", + llmApiKey: "sk-stream-secret", + llmModel: "gpt-stream-test", + timeoutMs: 1234, + taskProfilesVersion: 3, + taskProfiles, + }; +} + +function createSseResponse(events = [], status = 200) { + const encoder = new TextEncoder(); + return new Response( + new ReadableStream({ + start(controller) { + for (const event of events) { + const payload = + typeof event === "string" ? event : JSON.stringify(event); + controller.enqueue(encoder.encode(`data: ${payload}\n\n`)); + } + controller.close(); + }, + }), + { + status, + headers: { + "Content-Type": "text/event-stream", + }, + }, + ); +} + +function getSnapshot(taskKey = "extract") { + return globalThis.__stBmeRuntimeDebugState?.taskLlmRequests?.[taskKey] || null; +} + +async function withStreamingSettings(generation, run) { + const previousSettings = JSON.parse( + JSON.stringify(extensionsApi.extension_settings.st_bme || {}), + ); + extensionsApi.extension_settings.st_bme = { + ...previousSettings, + ...buildStreamingSettings(generation), + }; + delete globalThis.__stBmeRuntimeDebugState; + + try { + await run(); + } finally { + extensionsApi.extension_settings.st_bme = previousSettings; + } +} + +async function testDedicatedStreamingSuccess() { + const originalFetch = globalThis.fetch; + let fetchCount = 0; + + globalThis.fetch = async () => { + fetchCount += 1; + return createSseResponse([ + { choices: [{ delta: { content: '{"ok":' } }] }, + { choices: [{ delta: { content: "true}" } }] }, + { choices: [{ finish_reason: "stop" }] }, + "[DONE]", + ]); + }; + + try { + await withStreamingSettings({ stream: true }, async () => { + const result = await llm.callLLMForJSON({ + systemPrompt: "system", + userPrompt: "user", + maxRetries: 0, + taskType: "extract", + requestSource: "test:stream-success", + }); + + assert.deepEqual(result, { ok: true }); + assert.equal(fetchCount, 1); + + const snapshot = getSnapshot("extract"); + assert.ok(snapshot); + assert.equal(snapshot.streamRequested, true); + assert.equal(snapshot.streamActive, false); + assert.equal(snapshot.streamCompleted, true); + assert.equal(snapshot.streamFallback, false); + assert.equal(snapshot.streamFallbackSucceeded, false); + assert.equal(snapshot.streamFinishReason, "stop"); + assert.ok(snapshot.streamChunkCount >= 2); + assert.ok(snapshot.streamReceivedChars >= 10); + assert.match(snapshot.streamPreviewText, /\{"ok":true\}/); + assert.equal(snapshot.requestBody?.stream, true); + }); + } finally { + globalThis.fetch = originalFetch; + } +} + +async function testDedicatedStreamingFallsBackToNonStream() { + const originalFetch = globalThis.fetch; + let fetchCount = 0; + + globalThis.fetch = async () => { + fetchCount += 1; + if (fetchCount === 1) { + return new Response( + JSON.stringify({ + error: { + message: "Streaming is not supported by this provider", + }, + }), + { + status: 400, + headers: { + "Content-Type": "application/json", + }, + }, + ); + } + + return new Response( + JSON.stringify({ + choices: [ + { + message: { + content: '{"ok":true}', + }, + finish_reason: "stop", + }, + ], + }), + { + status: 200, + headers: { + "Content-Type": "application/json", + }, + }, + ); + }; + + try { + await withStreamingSettings({ stream: true }, async () => { + const result = await llm.callLLMForJSON({ + systemPrompt: "system", + userPrompt: "user", + maxRetries: 0, + taskType: "extract", + requestSource: "test:stream-fallback", + }); + + assert.deepEqual(result, { ok: true }); + assert.equal(fetchCount, 2); + + const snapshot = getSnapshot("extract"); + assert.ok(snapshot); + assert.equal(snapshot.streamRequested, true); + assert.equal(snapshot.streamCompleted, false); + assert.equal(snapshot.streamFallback, true); + assert.equal(snapshot.streamFallbackSucceeded, true); + assert.match(snapshot.streamFallbackReason, /stream/i); + assert.equal(snapshot.requestBody?.stream, false); + assert.equal(snapshot.filteredGeneration?.stream, true); + assert.equal(snapshot.redacted, true); + assert.doesNotMatch(JSON.stringify(snapshot), /sk-stream-secret/); + }); + } finally { + globalThis.fetch = originalFetch; + } +} + +async function testDedicatedStreamingAbortDoesNotLeaveActiveState() { + const originalFetch = globalThis.fetch; + const encoder = new TextEncoder(); + + globalThis.fetch = async (_url, options = {}) => { + const signal = options.signal; + let readCount = 0; + return { + ok: true, + status: 200, + statusText: "OK", + body: { + getReader() { + return { + async read() { + if (readCount === 0) { + readCount += 1; + return { + done: false, + value: encoder.encode( + 'data: {"choices":[{"delta":{"content":"{\\"partial\\":"}}]}\n\n', + ), + }; + } + + return await new Promise((resolve, reject) => { + signal?.addEventListener( + "abort", + () => + reject( + signal.reason || + new DOMException("Aborted", "AbortError"), + ), + { once: true }, + ); + }); + }, + releaseLock() {}, + }; + }, + }, + text: async () => "", + }; + }; + + try { + await withStreamingSettings({ stream: true }, async () => { + const controller = new AbortController(); + const promise = llm.callLLMForJSON({ + systemPrompt: "system", + userPrompt: "user", + maxRetries: 0, + taskType: "extract", + requestSource: "test:stream-abort", + signal: controller.signal, + }); + + await new Promise((resolve) => setTimeout(resolve, 0)); + controller.abort(new DOMException("Aborted", "AbortError")); + + await assert.rejects( + promise, + (error) => error?.name === "AbortError", + ); + + const snapshot = getSnapshot("extract"); + assert.ok(snapshot); + assert.equal(snapshot.streamRequested, true); + assert.equal(snapshot.streamActive, false); + assert.equal(snapshot.streamCompleted, false); + assert.equal(snapshot.streamFallback, false); + assert.equal(snapshot.streamFinishReason, "aborted"); + }); + } finally { + globalThis.fetch = originalFetch; + } +} + +await testDedicatedStreamingSuccess(); +await testDedicatedStreamingFallsBackToNonStream(); +await testDedicatedStreamingAbortDoesNotLeaveActiveState(); + +console.log("llm-streaming tests passed");