fix(extraction): add SSE stream idle timeout and vector sync timeout to prevent extraction hang

Root cause analysis: extraction can appear stuck at '正在提取xx-xx楼层' indefinitely
when the LLM API connection goes half-open (server stops sending data but keeps the
TCP connection alive). The SSE stream reader.read() would block forever with no
per-chunk idle timeout.

Changes:
1. llm/llm.js: Add LLM_STREAM_IDLE_TIMEOUT_MS (90s default) to
   parseDedicatedStreamingResponse. When no SSE data is received for 90 seconds,
   the read loop aborts with a clear timeout error instead of hanging forever.
   The idle timeout is configurable per-request (defaults to 30% of config timeout,
   minimum 30s).

2. index.js: Add EXTRACTION_VECTOR_SYNC_TIMEOUT_MS (120s) timeout wrapper around
   syncVectorState in handleExtractionSuccess. Vector sync now uses a combined
   AbortSignal (extraction signal + timeout) so that either user abort or 120s
   timeout will break out. Vector sync timeout is treated as non-fatal (doesn't
   abort the entire extraction batch).
This commit is contained in:
Youzini-afk
2026-04-29 02:42:50 +08:00
parent cb7ce45572
commit 0749ef29c5
2 changed files with 85 additions and 4 deletions

View File

@@ -1274,6 +1274,7 @@ let isRecoveringHistory = false;
let lastRecallFallbackNoticeAt = 0;
let lastExtractionWarningAt = 0;
const LOCAL_VECTOR_TIMEOUT_MS = 300000;
const EXTRACTION_VECTOR_SYNC_TIMEOUT_MS = 120000;
const STATUS_TOAST_THROTTLE_MS = 1500;
const RECALL_INPUT_RECORD_TTL_MS = 60000;
const TRIVIAL_GENERATION_SKIP_TTL_MS = 60000;
@@ -19953,9 +19954,36 @@ async function handleExtractionSuccess(
"向量同步中",
"正在同步本批提取后的向量索引",
);
vectorSync = await syncVectorState({ signal });
const vectorSyncTimeoutController = new AbortController();
const vectorSyncTimeout = setTimeout(
() => vectorSyncTimeoutController.abort(
new DOMException(
`向量同步超时 (${Math.round(EXTRACTION_VECTOR_SYNC_TIMEOUT_MS / 1000)}s)`,
"AbortError",
),
),
EXTRACTION_VECTOR_SYNC_TIMEOUT_MS,
);
let vectorSyncSignal = vectorSyncTimeoutController.signal;
if (signal) {
try {
if (typeof AbortSignal.any === "function") {
vectorSyncSignal = AbortSignal.any([signal, vectorSyncTimeoutController.signal]);
}
} catch {}
}
try {
vectorSync = await syncVectorState({ signal: vectorSyncSignal });
} finally {
clearTimeout(vectorSyncTimeout);
}
} catch (error) {
if (isAbortError(error)) throw error;
if (isAbortError(error)) {
const isVectorSyncTimeout = error?.name === "AbortError" &&
typeof error?.message === "string" &&
error.message.includes("向量同步超时");
if (!isVectorSyncTimeout) throw error;
}
const message = error?.message || String(error) || "向量同步阶段失败";
setBatchStageOutcome(
status,

View File

@@ -17,6 +17,7 @@ import { applyTaskRegex } from "../prompting/task-regex.js";
const MODULE_NAME = "st_bme";
const LLM_REQUEST_TIMEOUT_MS = 300000;
const LLM_STREAM_IDLE_TIMEOUT_MS = 90000;
const DEFAULT_TEXT_COMPLETION_TOKENS = 64000;
const DEFAULT_JSON_COMPLETION_TOKENS = 64000;
const STREAM_DEBUG_PREVIEW_MAX_CHARS = 1200;
@@ -1640,7 +1641,7 @@ function isAbortError(error) {
async function parseDedicatedStreamingResponse(
response,
{ taskKey = "", streamState = null, onStreamProgress = null } = {},
{ taskKey = "", streamState = null, onStreamProgress = null, idleTimeoutMs = LLM_STREAM_IDLE_TIMEOUT_MS } = {},
) {
const reader = response?.body?.getReader?.();
if (!reader) {
@@ -1663,13 +1664,56 @@ async function parseDedicatedStreamingResponse(
streamState.finishedAt = "";
recordTaskLlmStreamState(taskKey, streamState, {}, { force: true });
let lastChunkAt = Date.now();
const idleAbortController = new AbortController();
let idleTimer = null;
const resetIdleTimer = () => {
lastChunkAt = Date.now();
if (idleTimer) clearTimeout(idleTimer);
idleTimer = setTimeout(() => {
try {
idleAbortController.abort(
new DOMException(
`LLM 流式响应空闲超时 (${Math.round(idleTimeoutMs / 1000)}s 未收到数据)`,
"AbortError",
),
);
} catch {}
}, idleTimeoutMs);
};
const clearIdleTimer = () => {
if (idleTimer) {
clearTimeout(idleTimer);
idleTimer = null;
}
};
resetIdleTimer();
try {
const combinedSignal = idleAbortController.signal;
while (true) {
const { done, value } = await reader.read();
let readResult;
try {
readResult = await reader.read();
} catch (readError) {
if (idleAbortController.signal.aborted) {
throw new Error(
`LLM 流式响应空闲超时 (${Math.round(idleTimeoutMs / 1000)}s 未收到数据)`,
);
}
throw readError;
}
const { done, value } = readResult;
if (done) {
break;
}
resetIdleTimer();
buffer += decoder.decode(value, { stream: true }).replace(/\r\n/g, "\n");
while (true) {
@@ -1801,6 +1845,7 @@ async function parseDedicatedStreamingResponse(
recordTaskLlmStreamState(taskKey, streamState, {}, { force: true });
throw error;
} finally {
clearIdleTimer();
try {
reader.releaseLock?.();
} catch {
@@ -1818,6 +1863,7 @@ async function executeDedicatedRequest(
taskKey = "",
streamState = null,
onStreamProgress = null,
idleTimeoutMs = LLM_STREAM_IDLE_TIMEOUT_MS,
} = {},
) {
const requestBody = cloneRuntimeDebugValue(body, {}) || {};
@@ -1879,6 +1925,7 @@ async function executeDedicatedRequest(
taskKey,
streamState,
onStreamProgress,
idleTimeoutMs,
});
}
@@ -2167,6 +2214,9 @@ async function callDedicatedOpenAICompatible(
taskKey,
streamState,
onStreamProgress,
idleTimeoutMs: config.timeoutMs
? Math.max(Math.floor(config.timeoutMs * 0.3), 30000)
: LLM_STREAM_IDLE_TIMEOUT_MS,
});
} catch (error) {
if (
@@ -2199,6 +2249,9 @@ async function callDedicatedOpenAICompatible(
jsonMode,
taskKey,
streamState,
idleTimeoutMs: config.timeoutMs
? Math.max(Math.floor(config.timeoutMs * 0.3), 30000)
: LLM_STREAM_IDLE_TIMEOUT_MS,
});
streamState.fallbackSucceeded = true;