feat: 专用Memory LLM流式底层+自动降级+运行时调试状态

This commit is contained in:
Youzini-afk
2026-03-27 13:38:02 +08:00
parent 27bf5128f6
commit 62902c9867
2 changed files with 835 additions and 33 deletions

526
llm.js
View File

@@ -13,6 +13,8 @@ const LLM_REQUEST_TIMEOUT_MS = 300000;
const DEFAULT_TEXT_COMPLETION_TOKENS = 64000;
const DEFAULT_JSON_COMPLETION_TOKENS = 64000;
const RETRY_JSON_COMPLETION_TOKENS = 3200;
const STREAM_DEBUG_PREVIEW_MAX_CHARS = 1200;
const STREAM_DEBUG_UPDATE_INTERVAL_MS = 120;
const SENSITIVE_DEBUG_KEY_PATTERN =
/^(authorization|proxy_password|api[_-]?key|access[_-]?token|refresh[_-]?token|secret|password)$/i;
@@ -79,6 +81,10 @@ function sanitizeLlmDebugSnapshot(snapshot = {}) {
return redacted;
}
function nowIso() {
return new Date().toISOString();
}
function getRuntimeDebugState() {
const stateKey = "__stBmeRuntimeDebugState";
if (
@@ -254,6 +260,204 @@ function buildPromptExecutionSummary(debugContext = null) {
};
}
function createStreamDebugState({
requested = false,
fallback = false,
fallbackReason = "",
fallbackSucceeded = false,
} = {}) {
return {
requested: Boolean(requested),
active: false,
completed: false,
fallback: Boolean(fallback),
fallbackReason: String(fallbackReason || ""),
fallbackSucceeded: Boolean(fallbackSucceeded),
startedAt: "",
finishedAt: "",
chunkCount: 0,
receivedChars: 0,
previewText: "",
finishReason: "",
lastEventAt: "",
lastDebugUpdateAt: 0,
};
}
function buildStreamDebugSnapshot(streamState = {}) {
return {
streamRequested: Boolean(streamState.requested),
streamActive: Boolean(streamState.active),
streamCompleted: Boolean(streamState.completed),
streamFallback: Boolean(streamState.fallback),
streamFallbackReason: String(streamState.fallbackReason || ""),
streamFallbackSucceeded: Boolean(streamState.fallbackSucceeded),
streamStartedAt: String(streamState.startedAt || ""),
streamFinishedAt: String(streamState.finishedAt || ""),
streamChunkCount: Number(streamState.chunkCount || 0),
streamReceivedChars: Number(streamState.receivedChars || 0),
streamPreviewText: String(streamState.previewText || ""),
streamFinishReason: String(streamState.finishReason || ""),
streamLastEventAt: String(streamState.lastEventAt || ""),
};
}
function recordTaskLlmStreamState(
taskKey,
streamState,
extraSnapshot = {},
{ force = false } = {},
) {
if (!taskKey || !streamState) return;
const now = Date.now();
if (
!force &&
streamState.lastDebugUpdateAt &&
now - streamState.lastDebugUpdateAt < STREAM_DEBUG_UPDATE_INTERVAL_MS
) {
return;
}
streamState.lastDebugUpdateAt = now;
recordTaskLlmRequest(
taskKey,
{
...buildStreamDebugSnapshot(streamState),
...extraSnapshot,
},
{
merge: true,
},
);
}
function appendStreamPreview(existingPreview = "", deltaText = "") {
const combined = `${String(existingPreview || "")}${String(deltaText || "")}`;
if (combined.length <= STREAM_DEBUG_PREVIEW_MAX_CHARS) {
return combined;
}
return combined.slice(-STREAM_DEBUG_PREVIEW_MAX_CHARS);
}
function extractTextLikeValue(value) {
if (value == null) return "";
if (typeof value === "string") return value;
if (typeof value === "number" || typeof value === "boolean") {
return String(value);
}
if (Array.isArray(value)) {
return value
.map((item) =>
extractTextLikeValue(item?.text ?? item?.content ?? item),
)
.join("");
}
if (typeof value === "object") {
return extractTextLikeValue(value.text ?? value.content ?? "");
}
return "";
}
function extractStreamingChoice(payload = {}) {
return payload?.choices?.[0] || {};
}
function extractStreamingContentDelta(payload = {}) {
const choice = extractStreamingChoice(payload);
return extractTextLikeValue(
choice?.delta?.content ??
choice?.message?.content ??
choice?.text ??
payload?.content ??
payload?.text ??
"",
);
}
function extractStreamingReasoningDelta(payload = {}) {
const choice = extractStreamingChoice(payload);
return extractTextLikeValue(
choice?.delta?.reasoning_content ??
choice?.delta?.reasoning ??
choice?.message?.reasoning_content ??
payload?.reasoning ??
"",
);
}
function extractStreamingFinishReason(payload = {}) {
const choice = extractStreamingChoice(payload);
return String(
choice?.finish_reason ??
payload?.finish_reason ??
payload?.stop_reason ??
"",
);
}
function extractErrorMessageFromPayload(payload = {}) {
if (typeof payload === "string") {
return payload;
}
return String(
payload?.error?.message ??
payload?.message ??
payload?.detail ??
payload?.error ??
"",
).trim();
}
function looksLikeJsonModeUnsupportedMessage(message = "") {
return /(response_format|json[_-\s]?mode|json[_-\s]?object|json schema|structured output)/i.test(
String(message || ""),
);
}
function looksLikeStreamUnsupportedMessage(message = "") {
return /(stream|streaming|sse|event[-\s]?stream|text\/event-stream)/i.test(
String(message || ""),
);
}
function createStreamHandlingError(
message,
code = "stream_error",
options = {},
) {
const error = new Error(String(message || "流式请求失败"));
error.name = "StreamHandlingError";
error.code = code;
error.fallbackable = options?.fallbackable !== false;
error.status = Number.isFinite(Number(options?.status))
? Number(options.status)
: 0;
return error;
}
function isStreamHandlingError(error) {
return error?.name === "StreamHandlingError";
}
function shouldFallbackToNonStream(error) {
return isStreamHandlingError(error) && error?.fallbackable !== false;
}
function buildResponseErrorMessage(response, responseText = "") {
const rawText = String(responseText || "").trim();
if (!rawText) {
return String(response?.statusText || "");
}
try {
const parsed = JSON.parse(rawText);
return extractErrorMessageFromPayload(parsed) || rawText;
} catch {
return rawText;
}
}
function normalizeOpenAICompatibleBaseUrl(value) {
return String(value || "")
.trim()
@@ -630,6 +834,243 @@ function isAbortError(error) {
return error?.name === "AbortError";
}
async function parseDedicatedStreamingResponse(
response,
{ taskKey = "", streamState = null } = {},
) {
const reader = response?.body?.getReader?.();
if (!reader) {
throw createStreamHandlingError(
"专用 LLM 返回的响应体不可流式读取",
"missing_stream_body",
);
}
const decoder = new TextDecoder();
let buffer = "";
let content = "";
let reasoningContent = "";
let finishReason = "";
let sawStreamEvent = false;
streamState.active = true;
streamState.completed = false;
streamState.startedAt = streamState.startedAt || nowIso();
streamState.finishedAt = "";
recordTaskLlmStreamState(taskKey, streamState, {}, { force: true });
try {
while (true) {
const { done, value } = await reader.read();
if (done) {
break;
}
buffer += decoder.decode(value, { stream: true }).replace(/\r\n/g, "\n");
while (true) {
const boundaryIndex = buffer.indexOf("\n\n");
if (boundaryIndex < 0) {
break;
}
const eventBlock = buffer.slice(0, boundaryIndex).trim();
buffer = buffer.slice(boundaryIndex + 2);
if (!eventBlock) {
continue;
}
const dataLines = eventBlock
.split("\n")
.filter((line) => line.startsWith("data:"))
.map((line) => line.slice(5).trimStart());
if (!dataLines.length) {
continue;
}
const rawData = dataLines.join("\n").trim();
if (!rawData) {
continue;
}
if (rawData === "[DONE]") {
sawStreamEvent = true;
streamState.lastEventAt = nowIso();
break;
}
let parsed;
try {
parsed = JSON.parse(rawData);
} catch (error) {
throw createStreamHandlingError(
"专用 LLM 返回了无法解析的 SSE 数据块",
"invalid_sse_chunk",
{
fallbackable: true,
},
);
}
const payloadErrorMessage = extractErrorMessageFromPayload(parsed);
if (payloadErrorMessage) {
throw createStreamHandlingError(
payloadErrorMessage,
"stream_payload_error",
{
fallbackable:
looksLikeStreamUnsupportedMessage(payloadErrorMessage),
},
);
}
sawStreamEvent = true;
streamState.chunkCount += 1;
streamState.lastEventAt = nowIso();
const deltaText = extractStreamingContentDelta(parsed);
const reasoningDelta = extractStreamingReasoningDelta(parsed);
const nextFinishReason = extractStreamingFinishReason(parsed);
if (deltaText) {
content += deltaText;
streamState.receivedChars += deltaText.length;
streamState.previewText = appendStreamPreview(
streamState.previewText,
deltaText,
);
}
if (reasoningDelta) {
reasoningContent += reasoningDelta;
}
if (nextFinishReason) {
finishReason = nextFinishReason;
streamState.finishReason = nextFinishReason;
}
recordTaskLlmStreamState(taskKey, streamState, {});
}
}
buffer += decoder.decode();
if (!sawStreamEvent) {
throw createStreamHandlingError(
"专用 LLM 未返回可识别的 SSE 事件流",
"invalid_sse_stream",
);
}
streamState.active = false;
streamState.completed = true;
streamState.finishedAt = nowIso();
if (finishReason) {
streamState.finishReason = finishReason;
}
recordTaskLlmStreamState(taskKey, streamState, {}, { force: true });
return {
content: String(content || "").trim(),
finishReason: String(finishReason || ""),
reasoningContent: String(reasoningContent || ""),
raw: {
mode: "stream",
chunkCount: streamState.chunkCount,
},
};
} catch (error) {
streamState.active = false;
streamState.completed = false;
streamState.finishedAt = nowIso();
if (isAbortError(error)) {
streamState.finishReason = "aborted";
}
recordTaskLlmStreamState(taskKey, streamState, {}, { force: true });
throw error;
} finally {
try {
reader.releaseLock?.();
} catch {
// ignore
}
}
}
async function executeDedicatedRequest(
body,
{
signal,
timeoutMs = LLM_REQUEST_TIMEOUT_MS,
jsonMode = false,
taskKey = "",
streamState = null,
} = {},
) {
const requestBody = cloneRuntimeDebugValue(body, {}) || {};
while (true) {
recordTaskLlmRequest(
taskKey,
{
requestBody: requestBody,
},
{
merge: true,
},
);
const response = await fetchWithTimeout(
"/api/backends/chat-completions/generate",
{
method: "POST",
headers: getRequestHeaders(),
body: JSON.stringify(requestBody),
signal,
},
timeoutMs,
);
if (!response.ok) {
const responseText = await response.text().catch(() => "");
const message = buildResponseErrorMessage(response, responseText);
if (
jsonMode &&
_jsonModeSupported &&
response.status === 400 &&
looksLikeJsonModeUnsupportedMessage(message)
) {
console.warn("[ST-BME] API 不支持 json mode降级为普通 JSON 提示模式");
_jsonModeSupported = false;
delete requestBody.custom_include_body;
continue;
}
if (requestBody.stream === true && looksLikeStreamUnsupportedMessage(message)) {
throw createStreamHandlingError(
message || `Memory LLM proxy error ${response.status}`,
"stream_http_error",
{
status: response.status,
},
);
}
throw new Error(
`Memory LLM proxy error ${response.status}: ${message || response.statusText}`,
);
}
if (requestBody.stream === true) {
return await parseDedicatedStreamingResponse(response, {
taskKey,
streamState,
});
}
return await _parseResponse(response);
}
}
async function callDedicatedOpenAICompatible(
messages,
{
@@ -663,6 +1104,13 @@ async function callDedicatedOpenAICompatible(
filtered: {},
removed: [],
};
const taskKey = taskType || privateRequestSource;
const initialFilteredGeneration = generationResolved.filtered || {};
const streamRequested =
hasDedicatedConfig && initialFilteredGeneration.stream === true;
const streamState = createStreamDebugState({
requested: streamRequested,
});
recordTaskLlmRequest(taskType || privateRequestSource, {
requestSource: privateRequestSource,
taskType: String(taskType || "").trim(),
@@ -684,6 +1132,7 @@ async function callDedicatedOpenAICompatible(
taskType,
),
maxCompletionTokens,
...buildStreamDebugSnapshot(streamState),
});
if (!hasDedicatedConfig) {
const payload = await sendOpenAIRequest(
@@ -759,7 +1208,7 @@ async function callDedicatedOpenAICompatible(
body[field] = filteredGeneration[field];
}
if (jsonMode) {
if (jsonMode && _jsonModeSupported) {
body.custom_include_body = buildYamlObject({
response_format: {
type: "json_object",
@@ -767,7 +1216,7 @@ async function callDedicatedOpenAICompatible(
});
}
recordTaskLlmRequest(taskType || privateRequestSource, {
recordTaskLlmRequest(taskKey, {
requestSource: privateRequestSource,
taskType: String(taskType || "").trim(),
jsonMode,
@@ -787,43 +1236,54 @@ async function callDedicatedOpenAICompatible(
taskType,
),
requestBody: body,
...buildStreamDebugSnapshot(streamState),
});
const response = await fetchWithTimeout(
"/api/backends/chat-completions/generate",
{
method: "POST",
headers: getRequestHeaders(),
body: JSON.stringify(body),
try {
return await executeDedicatedRequest(body, {
signal,
},
config.timeoutMs,
);
timeoutMs: config.timeoutMs,
jsonMode,
taskKey,
streamState,
});
} catch (error) {
if (
!streamRequested ||
!shouldFallbackToNonStream(error) ||
isAbortError(error)
) {
throw error;
}
// 如果 400 且带了 structured output可能是 API 不支持,降级重试
if (
!response.ok &&
response.status === 400 &&
jsonMode &&
_jsonModeSupported
) {
console.warn("[ST-BME] API 不支持 json mode降级为普通 JSON 提示模式");
_jsonModeSupported = false;
delete body.custom_include_body;
const retryResponse = await fetchWithTimeout(
"/api/backends/chat-completions/generate",
{
method: "POST",
headers: getRequestHeaders(),
body: JSON.stringify(body),
signal,
},
config.timeoutMs,
streamState.active = false;
streamState.completed = false;
streamState.fallback = true;
streamState.fallbackReason = error?.message || String(error);
streamState.finishedAt = nowIso();
recordTaskLlmStreamState(taskKey, streamState, {}, { force: true });
console.warn(
`[ST-BME] 专用 LLM 流式不可用,已自动降级为非流式: ${streamState.fallbackReason}`,
);
return await _parseResponse(retryResponse);
}
return await _parseResponse(response);
const fallbackBody = {
...body,
stream: false,
};
const fallbackResponse = await executeDedicatedRequest(fallbackBody, {
signal,
timeoutMs: config.timeoutMs,
jsonMode,
taskKey,
streamState,
});
streamState.fallbackSucceeded = true;
recordTaskLlmStreamState(taskKey, streamState, {}, { force: true });
return fallbackResponse;
}
}
async function _parseResponse(response) {