From 2304a6a8215d13389947ea65cda3f84c5ab49f24 Mon Sep 17 00:00:00 2001 From: youzini Date: Thu, 11 Jun 2026 04:41:39 +0000 Subject: [PATCH] feat(extraction): run split stages in parallel --- maintenance/extractor.js | 82 ++++++++++--- runtime/settings-defaults.js | 1 + tests/extractor-split-pipeline.mjs | 190 ++++++++++++++++++++++++++++- ui/panel.html | 15 +++ ui/panel.js | 13 ++ 5 files changed, 283 insertions(+), 18 deletions(-) diff --git a/maintenance/extractor.js b/maintenance/extractor.js index 532e35f..7d68f60 100644 --- a/maintenance/extractor.js +++ b/maintenance/extractor.js @@ -53,6 +53,13 @@ import { } from "../runtime/user-alias-utils.js"; import { buildNodeVectorText, isDirectVectorConfig } from "../vector/vector-index.js"; +const VALID_EXTRACTION_OPERATION_ACTIONS = new Set([ + "create", + "update", + "delete", + "_skip", +]); + function createAbortError(message = "操作已终止") { const error = new Error(message); error.name = "AbortError"; @@ -923,6 +930,22 @@ function validateExtractionDraft({ processedRange: [lastProcessedSeq, lastProcessedSeq], }; } + const invalidOperation = normalizedResult.operations.find( + (op) => !VALID_EXTRACTION_OPERATION_ACTIONS.has(String(op?.action || "")), + ); + if (invalidOperation) { + const message = `[ST-BME] 未知操作类型: ${invalidOperation?.action ?? ""}`; + console.warn(message, invalidOperation); + return { + success: false, + error: message, + newNodes: 0, + updatedNodes: 0, + newEdges: 0, + newNodeIds: [], + processedRange: [lastProcessedSeq, lastProcessedSeq], + }; + } return null; } @@ -1448,26 +1471,53 @@ export async function extractMemories({ return stageResult; }; - const objectiveLlmResult = await buildAndCallStageForSplit("extract_objective"); - const objectiveDraft = resolveExtractionDraft({ - llmResult: objectiveLlmResult, - schema, - graph, - scopeRuntime, - }); - const objectiveValidationFailure = validateExtractionDraft({ - draft: objectiveDraft, - lastProcessedSeq, - }); - if (objectiveValidationFailure) return objectiveValidationFailure; - - const filteredObjectiveResult = filterObjectiveExtractionResult(objectiveDraft.normalizedResult); - const subjectiveLlmResult = await buildAndCallStageForSplit("extract_subjective", { + const subjectiveStageOptions = { ownerContext: { activeCharacterOwner: scopeRuntime.activeCharacterOwner || "", activeUserOwner: scopeRuntime.activeUserOwner || "", }, - }); + }; + const isSerialSplit = settings.extractSplitExecutionMode === "serial"; + + let objectiveLlmResult; + let subjectiveLlmResult; + let filteredObjectiveResult; + if (isSerialSplit) { + objectiveLlmResult = await buildAndCallStageForSplit("extract_objective"); + const objectiveDraft = resolveExtractionDraft({ + llmResult: objectiveLlmResult, + schema, + graph, + scopeRuntime, + }); + const objectiveValidationFailure = validateExtractionDraft({ + draft: objectiveDraft, + lastProcessedSeq, + }); + if (objectiveValidationFailure) return objectiveValidationFailure; + filteredObjectiveResult = filterObjectiveExtractionResult(objectiveDraft.normalizedResult); + subjectiveLlmResult = await buildAndCallStageForSplit( + "extract_subjective", + subjectiveStageOptions, + ); + } else { + [objectiveLlmResult, subjectiveLlmResult] = await Promise.all([ + buildAndCallStageForSplit("extract_objective"), + buildAndCallStageForSplit("extract_subjective", subjectiveStageOptions), + ]); + const objectiveDraft = resolveExtractionDraft({ + llmResult: objectiveLlmResult, + schema, + graph, + scopeRuntime, + }); + const objectiveValidationFailure = validateExtractionDraft({ + draft: objectiveDraft, + lastProcessedSeq, + }); + if (objectiveValidationFailure) return objectiveValidationFailure; + filteredObjectiveResult = filterObjectiveExtractionResult(objectiveDraft.normalizedResult); + } const subjectiveDraft = resolveExtractionDraft({ llmResult: subjectiveLlmResult, schema, diff --git a/runtime/settings-defaults.js b/runtime/settings-defaults.js index ec15a7b..44f4af2 100644 --- a/runtime/settings-defaults.js +++ b/runtime/settings-defaults.js @@ -33,6 +33,7 @@ export const defaultSettings = { extractAssistantExcludeRules: [], extractRecentMessageCap: 0, extractPromptStructuredMode: "both", + extractSplitExecutionMode: "parallel", extractWorldbookMode: "active", extractIncludeStoryTime: true, extractIncludeSummaries: true, diff --git a/tests/extractor-split-pipeline.mjs b/tests/extractor-split-pipeline.mjs index d8f4445..e9751d3 100644 --- a/tests/extractor-split-pipeline.mjs +++ b/tests/extractor-split-pipeline.mjs @@ -302,7 +302,7 @@ for (const legacyPatch of [ assert.equal(capturedTaskTypes.includes("extract"), false); } -// split-v1 calls objective then subjective, merges both stage outputs, and commits once. +// split-v1 calls objective and subjective, merges both stage outputs, and commits once. { const graph = createGraphWithCharacter(); const capturedTaskTypes = []; @@ -327,7 +327,7 @@ for (const legacyPatch of [ assert.deepEqual( capturedTaskTypes, ["extract_objective", "extract_subjective"], - "split-v1 should call the LLM once for objective extraction, then once for subjective extraction", + "split-v1 should call the LLM once for objective extraction and once for subjective extraction", ); assert.equal(result.success, true); assert.equal(result.newNodes, 2, "objective event and subjective POV memory should be committed together"); @@ -394,4 +394,190 @@ for (const legacyPatch of [ } } +// Parallel mode (default): objective and subjective start before either resolves. +{ + let objectiveStarted = false; + let subjectiveStarted = false; + let objectiveResolve; + const objectivePromise = new Promise((resolve) => { objectiveResolve = resolve; }); + let subjectiveResolve; + const subjectivePromise = new Promise((resolve) => { subjectiveResolve = resolve; }); + + const startOrder = []; + const restore = setTestOverrides({ + llm: { + async callLLMForJSON(payload = {}) { + if (payload.taskType === "extract_objective") { + objectiveStarted = true; + startOrder.push("objective_start"); + await objectivePromise; + return objectivePayload(); + } + if (payload.taskType === "extract_subjective") { + subjectiveStarted = true; + startOrder.push("subjective_start"); + await subjectivePromise; + return subjectivePayload(); + } + return { operations: [], cognitionUpdates: [], regionUpdates: {} }; + }, + }, + }); + + try { + const graph = createGraphWithCharacter(); + const extractPromise = extractMemories({ + graph, + ...baseExtractParams, + settings: { ...defaultSettings, extractSplitExecutionMode: "parallel" }, + }); + + // Let the event loop turn so both stages can start. + await new Promise((r) => setTimeout(r, 0)); + + // Both should have started before we resolve either. + assert.ok(objectiveStarted, "parallel: objective should have started"); + assert.ok(subjectiveStarted, "parallel: subjective should have started before objective resolved"); + assert.ok( + startOrder.includes("objective_start") && startOrder.includes("subjective_start"), + "parallel: both stages should have started concurrently", + ); + + // Now resolve both to let extraction finish. + objectiveResolve(); + subjectiveResolve(); + const result = await extractPromise; + assert.equal(result.success, true, "parallel extraction should succeed after both stages complete"); + } finally { + restore(); + } +} + +// Serial mode preserves the old escape behavior: invalid objective output does not start subjective. +{ + const graph = createGraphWithCharacter(); + const initialNodeCount = graph.nodes.length; + const capturedTaskTypes = []; + const restore = setTestOverrides({ + llm: { + async callLLMForJSON(payload = {}) { + capturedTaskTypes.push(payload.taskType); + if (payload.taskType === "extract_objective") return { thought: "missing operations" }; + if (payload.taskType === "extract_subjective") return subjectivePayload(); + return { operations: [], cognitionUpdates: [], regionUpdates: {} }; + }, + }, + }); + + try { + const result = await extractMemories({ + graph, + ...baseExtractParams, + settings: { ...defaultSettings, extractSplitExecutionMode: "serial" }, + }); + + assert.deepEqual( + capturedTaskTypes, + ["extract_objective"], + "serial: invalid objective output should not start subjective extraction", + ); + assert.equal(result.success, false); + assert.equal(graph.nodes.length, initialNodeCount, "serial objective failure should not commit nodes"); + } finally { + restore(); + } +} + +// Serial mode: subjective does not start until objective resolves. +{ + let objectiveStarted = false; + let subjectiveStarted = false; + let objectiveResolve; + const objectivePromise = new Promise((resolve) => { objectiveResolve = resolve; }); + + const restore = setTestOverrides({ + llm: { + async callLLMForJSON(payload = {}) { + if (payload.taskType === "extract_objective") { + objectiveStarted = true; + await objectivePromise; + return objectivePayload(); + } + if (payload.taskType === "extract_subjective") { + subjectiveStarted = true; + return subjectivePayload(); + } + return { operations: [], cognitionUpdates: [], regionUpdates: {} }; + }, + }, + }); + + try { + const graph = createGraphWithCharacter(); + const extractPromise = extractMemories({ + graph, + ...baseExtractParams, + settings: { ...defaultSettings, extractSplitExecutionMode: "serial" }, + }); + + // Let the event loop turn. + await new Promise((r) => setTimeout(r, 0)); + + assert.ok(objectiveStarted, "serial: objective should have started"); + assert.ok(!subjectiveStarted, "serial: subjective should NOT have started while objective is pending"); + + // Resolve objective so subjective can proceed. + objectiveResolve(); + const result = await extractPromise; + assert.equal(result.success, true, "serial extraction should succeed after both stages complete sequentially"); + assert.ok(subjectiveStarted, "serial: subjective should have started after objective resolved"); + } finally { + restore(); + } +} + +// Invalid subjective operation action must fail before any valid objective operation mutates the graph. +{ + const graph = createGraphWithCharacter(); + const initialNodeCount = graph.nodes.length; + const initialEdgeCount = graph.edges.length; + const restore = setTestOverrides({ + llm: { + async callLLMForJSON(payload = {}) { + if (payload.taskType === "extract_objective") return objectivePayload(); + if (payload.taskType === "extract_subjective") { + return { + operations: [ + { + action: "nonsense", + type: "pov_memory", + fields: { summary: "非法主观操作不应让客观节点先写入" }, + }, + ], + cognitionUpdates: [], + regionUpdates: {}, + }; + } + return { operations: [], cognitionUpdates: [], regionUpdates: {} }; + }, + }, + }); + + try { + const result = await extractMemories({ + graph, + ...baseExtractParams, + settings: { ...defaultSettings, extractSplitExecutionMode: "parallel" }, + }); + + assert.equal(result.success, false); + assert.match(result.error, /未知操作类型/); + assert.equal(graph.nodes.length, initialNodeCount, "invalid merged action should not partially create objective nodes"); + assert.equal(graph.edges.length, initialEdgeCount, "invalid merged action should not partially create edges"); + assert.equal(graph.lastProcessedSeq ?? -1, -1, "invalid merged action should not advance extraction progress"); + } finally { + restore(); + } +} + console.log("extractor-split-pipeline tests passed"); diff --git a/ui/panel.html b/ui/panel.html index f795aba..5475d87 100644 --- a/ui/panel.html +++ b/ui/panel.html @@ -1837,6 +1837,21 @@
控制 LLM 在提取时看到的是纯 transcript、结构化 recentMessages,还是两者同时提供。通常保持“混合”即可。
+
+ + +
+
+ 默认并发请求客观提取与主观 POV 提取,以降低等待时间;如果提供商限流,可切换为串行。 +
diff --git a/ui/panel.js b/ui/panel.js index be0da3d..b896494 100644 --- a/ui/panel.js +++ b/ui/panel.js @@ -7988,6 +7988,10 @@ function _refreshConfigTab() { "bme-setting-extract-prompt-structured-mode", settings.extractPromptStructuredMode || "both", ); + _setInputValue( + "bme-setting-extract-split-execution-mode", + settings.extractSplitExecutionMode || "parallel", + ); _setInputValue( "bme-setting-extract-worldbook-mode", settings.extractWorldbookMode || "active", @@ -8532,6 +8536,15 @@ function _bindConfigControls() { }); extractStructuredModeEl.dataset.bmeBound = "true"; } + const extractSplitExecutionModeEl = document.getElementById( + "bme-setting-extract-split-execution-mode", + ); + if (extractSplitExecutionModeEl && extractSplitExecutionModeEl.dataset.bmeBound !== "true") { + extractSplitExecutionModeEl.addEventListener("change", () => { + _patchSettings({ extractSplitExecutionMode: extractSplitExecutionModeEl.value || "parallel" }); + }); + extractSplitExecutionModeEl.dataset.bmeBound = "true"; + } const extractWorldbookModeEl = document.getElementById( "bme-setting-extract-worldbook-mode", );