From 0749ef29c5bfc46caf5441c24a716e8ade326e23 Mon Sep 17 00:00:00 2001 From: Youzini-afk <13153778771cx@gmail.com> Date: Wed, 29 Apr 2026 02:42:50 +0800 Subject: [PATCH] fix(extraction): add SSE stream idle timeout and vector sync timeout to prevent extraction hang MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Root cause analysis: extraction can appear stuck at '正在提取xx-xx楼层' indefinitely when the LLM API connection goes half-open (server stops sending data but keeps the TCP connection alive). The SSE stream reader.read() would block forever with no per-chunk idle timeout. Changes: 1. llm/llm.js: Add LLM_STREAM_IDLE_TIMEOUT_MS (90s default) to parseDedicatedStreamingResponse. When no SSE data is received for 90 seconds, the read loop aborts with a clear timeout error instead of hanging forever. The idle timeout is configurable per-request (defaults to 30% of config timeout, minimum 30s). 2. index.js: Add EXTRACTION_VECTOR_SYNC_TIMEOUT_MS (120s) timeout wrapper around syncVectorState in handleExtractionSuccess. Vector sync now uses a combined AbortSignal (extraction signal + timeout) so that either user abort or 120s timeout will break out. Vector sync timeout is treated as non-fatal (doesn't abort the entire extraction batch). --- index.js | 32 ++++++++++++++++++++++++++++-- llm/llm.js | 57 ++++++++++++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 85 insertions(+), 4 deletions(-) diff --git a/index.js b/index.js index 170707c..c63156c 100644 --- a/index.js +++ b/index.js @@ -1274,6 +1274,7 @@ let isRecoveringHistory = false; let lastRecallFallbackNoticeAt = 0; let lastExtractionWarningAt = 0; const LOCAL_VECTOR_TIMEOUT_MS = 300000; +const EXTRACTION_VECTOR_SYNC_TIMEOUT_MS = 120000; const STATUS_TOAST_THROTTLE_MS = 1500; const RECALL_INPUT_RECORD_TTL_MS = 60000; const TRIVIAL_GENERATION_SKIP_TTL_MS = 60000; @@ -19953,9 +19954,36 @@ async function handleExtractionSuccess( "向量同步中", "正在同步本批提取后的向量索引", ); - vectorSync = await syncVectorState({ signal }); + const vectorSyncTimeoutController = new AbortController(); + const vectorSyncTimeout = setTimeout( + () => vectorSyncTimeoutController.abort( + new DOMException( + `向量同步超时 (${Math.round(EXTRACTION_VECTOR_SYNC_TIMEOUT_MS / 1000)}s)`, + "AbortError", + ), + ), + EXTRACTION_VECTOR_SYNC_TIMEOUT_MS, + ); + let vectorSyncSignal = vectorSyncTimeoutController.signal; + if (signal) { + try { + if (typeof AbortSignal.any === "function") { + vectorSyncSignal = AbortSignal.any([signal, vectorSyncTimeoutController.signal]); + } + } catch {} + } + try { + vectorSync = await syncVectorState({ signal: vectorSyncSignal }); + } finally { + clearTimeout(vectorSyncTimeout); + } } catch (error) { - if (isAbortError(error)) throw error; + if (isAbortError(error)) { + const isVectorSyncTimeout = error?.name === "AbortError" && + typeof error?.message === "string" && + error.message.includes("向量同步超时"); + if (!isVectorSyncTimeout) throw error; + } const message = error?.message || String(error) || "向量同步阶段失败"; setBatchStageOutcome( status, diff --git a/llm/llm.js b/llm/llm.js index 2e30ef4..be640d9 100644 --- a/llm/llm.js +++ b/llm/llm.js @@ -17,6 +17,7 @@ import { applyTaskRegex } from "../prompting/task-regex.js"; const MODULE_NAME = "st_bme"; const LLM_REQUEST_TIMEOUT_MS = 300000; +const LLM_STREAM_IDLE_TIMEOUT_MS = 90000; const DEFAULT_TEXT_COMPLETION_TOKENS = 64000; const DEFAULT_JSON_COMPLETION_TOKENS = 64000; const STREAM_DEBUG_PREVIEW_MAX_CHARS = 1200; @@ -1640,7 +1641,7 @@ function isAbortError(error) { async function parseDedicatedStreamingResponse( response, - { taskKey = "", streamState = null, onStreamProgress = null } = {}, + { taskKey = "", streamState = null, onStreamProgress = null, idleTimeoutMs = LLM_STREAM_IDLE_TIMEOUT_MS } = {}, ) { const reader = response?.body?.getReader?.(); if (!reader) { @@ -1663,13 +1664,56 @@ async function parseDedicatedStreamingResponse( streamState.finishedAt = ""; recordTaskLlmStreamState(taskKey, streamState, {}, { force: true }); + let lastChunkAt = Date.now(); + const idleAbortController = new AbortController(); + let idleTimer = null; + + const resetIdleTimer = () => { + lastChunkAt = Date.now(); + if (idleTimer) clearTimeout(idleTimer); + idleTimer = setTimeout(() => { + try { + idleAbortController.abort( + new DOMException( + `LLM 流式响应空闲超时 (${Math.round(idleTimeoutMs / 1000)}s 未收到数据)`, + "AbortError", + ), + ); + } catch {} + }, idleTimeoutMs); + }; + + const clearIdleTimer = () => { + if (idleTimer) { + clearTimeout(idleTimer); + idleTimer = null; + } + }; + + resetIdleTimer(); + try { + const combinedSignal = idleAbortController.signal; while (true) { - const { done, value } = await reader.read(); + let readResult; + try { + readResult = await reader.read(); + } catch (readError) { + if (idleAbortController.signal.aborted) { + throw new Error( + `LLM 流式响应空闲超时 (${Math.round(idleTimeoutMs / 1000)}s 未收到数据)`, + ); + } + throw readError; + } + + const { done, value } = readResult; if (done) { break; } + resetIdleTimer(); + buffer += decoder.decode(value, { stream: true }).replace(/\r\n/g, "\n"); while (true) { @@ -1801,6 +1845,7 @@ async function parseDedicatedStreamingResponse( recordTaskLlmStreamState(taskKey, streamState, {}, { force: true }); throw error; } finally { + clearIdleTimer(); try { reader.releaseLock?.(); } catch { @@ -1818,6 +1863,7 @@ async function executeDedicatedRequest( taskKey = "", streamState = null, onStreamProgress = null, + idleTimeoutMs = LLM_STREAM_IDLE_TIMEOUT_MS, } = {}, ) { const requestBody = cloneRuntimeDebugValue(body, {}) || {}; @@ -1879,6 +1925,7 @@ async function executeDedicatedRequest( taskKey, streamState, onStreamProgress, + idleTimeoutMs, }); } @@ -2167,6 +2214,9 @@ async function callDedicatedOpenAICompatible( taskKey, streamState, onStreamProgress, + idleTimeoutMs: config.timeoutMs + ? Math.max(Math.floor(config.timeoutMs * 0.3), 30000) + : LLM_STREAM_IDLE_TIMEOUT_MS, }); } catch (error) { if ( @@ -2199,6 +2249,9 @@ async function callDedicatedOpenAICompatible( jsonMode, taskKey, streamState, + idleTimeoutMs: config.timeoutMs + ? Math.max(Math.floor(config.timeoutMs * 0.3), 30000) + : LLM_STREAM_IDLE_TIMEOUT_MS, }); streamState.fallbackSucceeded = true;