feat(extraction): run split stages in parallel

This commit is contained in:
youzini
2026-06-11 04:41:39 +00:00
parent 8baf819148
commit 2304a6a821
5 changed files with 283 additions and 18 deletions

View File

@@ -53,6 +53,13 @@ import {
} from "../runtime/user-alias-utils.js";
import { buildNodeVectorText, isDirectVectorConfig } from "../vector/vector-index.js";
const VALID_EXTRACTION_OPERATION_ACTIONS = new Set([
"create",
"update",
"delete",
"_skip",
]);
function createAbortError(message = "操作已终止") {
const error = new Error(message);
error.name = "AbortError";
@@ -923,6 +930,22 @@ function validateExtractionDraft({
processedRange: [lastProcessedSeq, lastProcessedSeq],
};
}
const invalidOperation = normalizedResult.operations.find(
(op) => !VALID_EXTRACTION_OPERATION_ACTIONS.has(String(op?.action || "")),
);
if (invalidOperation) {
const message = `[ST-BME] 未知操作类型: ${invalidOperation?.action ?? "<missing>"}`;
console.warn(message, invalidOperation);
return {
success: false,
error: message,
newNodes: 0,
updatedNodes: 0,
newEdges: 0,
newNodeIds: [],
processedRange: [lastProcessedSeq, lastProcessedSeq],
};
}
return null;
}
@@ -1448,26 +1471,53 @@ export async function extractMemories({
return stageResult;
};
const objectiveLlmResult = await buildAndCallStageForSplit("extract_objective");
const objectiveDraft = resolveExtractionDraft({
llmResult: objectiveLlmResult,
schema,
graph,
scopeRuntime,
});
const objectiveValidationFailure = validateExtractionDraft({
draft: objectiveDraft,
lastProcessedSeq,
});
if (objectiveValidationFailure) return objectiveValidationFailure;
const filteredObjectiveResult = filterObjectiveExtractionResult(objectiveDraft.normalizedResult);
const subjectiveLlmResult = await buildAndCallStageForSplit("extract_subjective", {
const subjectiveStageOptions = {
ownerContext: {
activeCharacterOwner: scopeRuntime.activeCharacterOwner || "",
activeUserOwner: scopeRuntime.activeUserOwner || "",
},
});
};
const isSerialSplit = settings.extractSplitExecutionMode === "serial";
let objectiveLlmResult;
let subjectiveLlmResult;
let filteredObjectiveResult;
if (isSerialSplit) {
objectiveLlmResult = await buildAndCallStageForSplit("extract_objective");
const objectiveDraft = resolveExtractionDraft({
llmResult: objectiveLlmResult,
schema,
graph,
scopeRuntime,
});
const objectiveValidationFailure = validateExtractionDraft({
draft: objectiveDraft,
lastProcessedSeq,
});
if (objectiveValidationFailure) return objectiveValidationFailure;
filteredObjectiveResult = filterObjectiveExtractionResult(objectiveDraft.normalizedResult);
subjectiveLlmResult = await buildAndCallStageForSplit(
"extract_subjective",
subjectiveStageOptions,
);
} else {
[objectiveLlmResult, subjectiveLlmResult] = await Promise.all([
buildAndCallStageForSplit("extract_objective"),
buildAndCallStageForSplit("extract_subjective", subjectiveStageOptions),
]);
const objectiveDraft = resolveExtractionDraft({
llmResult: objectiveLlmResult,
schema,
graph,
scopeRuntime,
});
const objectiveValidationFailure = validateExtractionDraft({
draft: objectiveDraft,
lastProcessedSeq,
});
if (objectiveValidationFailure) return objectiveValidationFailure;
filteredObjectiveResult = filterObjectiveExtractionResult(objectiveDraft.normalizedResult);
}
const subjectiveDraft = resolveExtractionDraft({
llmResult: subjectiveLlmResult,
schema,

View File

@@ -33,6 +33,7 @@ export const defaultSettings = {
extractAssistantExcludeRules: [],
extractRecentMessageCap: 0,
extractPromptStructuredMode: "both",
extractSplitExecutionMode: "parallel",
extractWorldbookMode: "active",
extractIncludeStoryTime: true,
extractIncludeSummaries: true,

View File

@@ -302,7 +302,7 @@ for (const legacyPatch of [
assert.equal(capturedTaskTypes.includes("extract"), false);
}
// split-v1 calls objective then subjective, merges both stage outputs, and commits once.
// split-v1 calls objective and subjective, merges both stage outputs, and commits once.
{
const graph = createGraphWithCharacter();
const capturedTaskTypes = [];
@@ -327,7 +327,7 @@ for (const legacyPatch of [
assert.deepEqual(
capturedTaskTypes,
["extract_objective", "extract_subjective"],
"split-v1 should call the LLM once for objective extraction, then once for subjective extraction",
"split-v1 should call the LLM once for objective extraction and once for subjective extraction",
);
assert.equal(result.success, true);
assert.equal(result.newNodes, 2, "objective event and subjective POV memory should be committed together");
@@ -394,4 +394,190 @@ for (const legacyPatch of [
}
}
// Parallel mode (default): objective and subjective start before either resolves.
{
let objectiveStarted = false;
let subjectiveStarted = false;
let objectiveResolve;
const objectivePromise = new Promise((resolve) => { objectiveResolve = resolve; });
let subjectiveResolve;
const subjectivePromise = new Promise((resolve) => { subjectiveResolve = resolve; });
const startOrder = [];
const restore = setTestOverrides({
llm: {
async callLLMForJSON(payload = {}) {
if (payload.taskType === "extract_objective") {
objectiveStarted = true;
startOrder.push("objective_start");
await objectivePromise;
return objectivePayload();
}
if (payload.taskType === "extract_subjective") {
subjectiveStarted = true;
startOrder.push("subjective_start");
await subjectivePromise;
return subjectivePayload();
}
return { operations: [], cognitionUpdates: [], regionUpdates: {} };
},
},
});
try {
const graph = createGraphWithCharacter();
const extractPromise = extractMemories({
graph,
...baseExtractParams,
settings: { ...defaultSettings, extractSplitExecutionMode: "parallel" },
});
// Let the event loop turn so both stages can start.
await new Promise((r) => setTimeout(r, 0));
// Both should have started before we resolve either.
assert.ok(objectiveStarted, "parallel: objective should have started");
assert.ok(subjectiveStarted, "parallel: subjective should have started before objective resolved");
assert.ok(
startOrder.includes("objective_start") && startOrder.includes("subjective_start"),
"parallel: both stages should have started concurrently",
);
// Now resolve both to let extraction finish.
objectiveResolve();
subjectiveResolve();
const result = await extractPromise;
assert.equal(result.success, true, "parallel extraction should succeed after both stages complete");
} finally {
restore();
}
}
// Serial mode preserves the old escape behavior: invalid objective output does not start subjective.
{
const graph = createGraphWithCharacter();
const initialNodeCount = graph.nodes.length;
const capturedTaskTypes = [];
const restore = setTestOverrides({
llm: {
async callLLMForJSON(payload = {}) {
capturedTaskTypes.push(payload.taskType);
if (payload.taskType === "extract_objective") return { thought: "missing operations" };
if (payload.taskType === "extract_subjective") return subjectivePayload();
return { operations: [], cognitionUpdates: [], regionUpdates: {} };
},
},
});
try {
const result = await extractMemories({
graph,
...baseExtractParams,
settings: { ...defaultSettings, extractSplitExecutionMode: "serial" },
});
assert.deepEqual(
capturedTaskTypes,
["extract_objective"],
"serial: invalid objective output should not start subjective extraction",
);
assert.equal(result.success, false);
assert.equal(graph.nodes.length, initialNodeCount, "serial objective failure should not commit nodes");
} finally {
restore();
}
}
// Serial mode: subjective does not start until objective resolves.
{
let objectiveStarted = false;
let subjectiveStarted = false;
let objectiveResolve;
const objectivePromise = new Promise((resolve) => { objectiveResolve = resolve; });
const restore = setTestOverrides({
llm: {
async callLLMForJSON(payload = {}) {
if (payload.taskType === "extract_objective") {
objectiveStarted = true;
await objectivePromise;
return objectivePayload();
}
if (payload.taskType === "extract_subjective") {
subjectiveStarted = true;
return subjectivePayload();
}
return { operations: [], cognitionUpdates: [], regionUpdates: {} };
},
},
});
try {
const graph = createGraphWithCharacter();
const extractPromise = extractMemories({
graph,
...baseExtractParams,
settings: { ...defaultSettings, extractSplitExecutionMode: "serial" },
});
// Let the event loop turn.
await new Promise((r) => setTimeout(r, 0));
assert.ok(objectiveStarted, "serial: objective should have started");
assert.ok(!subjectiveStarted, "serial: subjective should NOT have started while objective is pending");
// Resolve objective so subjective can proceed.
objectiveResolve();
const result = await extractPromise;
assert.equal(result.success, true, "serial extraction should succeed after both stages complete sequentially");
assert.ok(subjectiveStarted, "serial: subjective should have started after objective resolved");
} finally {
restore();
}
}
// Invalid subjective operation action must fail before any valid objective operation mutates the graph.
{
const graph = createGraphWithCharacter();
const initialNodeCount = graph.nodes.length;
const initialEdgeCount = graph.edges.length;
const restore = setTestOverrides({
llm: {
async callLLMForJSON(payload = {}) {
if (payload.taskType === "extract_objective") return objectivePayload();
if (payload.taskType === "extract_subjective") {
return {
operations: [
{
action: "nonsense",
type: "pov_memory",
fields: { summary: "非法主观操作不应让客观节点先写入" },
},
],
cognitionUpdates: [],
regionUpdates: {},
};
}
return { operations: [], cognitionUpdates: [], regionUpdates: {} };
},
},
});
try {
const result = await extractMemories({
graph,
...baseExtractParams,
settings: { ...defaultSettings, extractSplitExecutionMode: "parallel" },
});
assert.equal(result.success, false);
assert.match(result.error, /未知操作类型/);
assert.equal(graph.nodes.length, initialNodeCount, "invalid merged action should not partially create objective nodes");
assert.equal(graph.edges.length, initialEdgeCount, "invalid merged action should not partially create edges");
assert.equal(graph.lastProcessedSeq ?? -1, -1, "invalid merged action should not advance extraction progress");
} finally {
restore();
}
}
console.log("extractor-split-pipeline tests passed");

View File

@@ -1837,6 +1837,21 @@
<div class="bme-config-help">
控制 LLM 在提取时看到的是纯 transcript、结构化 recentMessages还是两者同时提供。通常保持“混合”即可。
</div>
<div class="bme-config-row">
<label for="bme-setting-extract-split-execution-mode"
>客观/主观提取方式</label
>
<select
id="bme-setting-extract-split-execution-mode"
class="bme-config-input"
>
<option value="parallel">并发(默认,更快)</option>
<option value="serial">串行(限流时使用)</option>
</select>
</div>
<div class="bme-config-help">
默认并发请求客观提取与主观 POV 提取,以降低等待时间;如果提供商限流,可切换为串行。
</div>
</div>
</details>
<div class="bme-config-row">

View File

@@ -7988,6 +7988,10 @@ function _refreshConfigTab() {
"bme-setting-extract-prompt-structured-mode",
settings.extractPromptStructuredMode || "both",
);
_setInputValue(
"bme-setting-extract-split-execution-mode",
settings.extractSplitExecutionMode || "parallel",
);
_setInputValue(
"bme-setting-extract-worldbook-mode",
settings.extractWorldbookMode || "active",
@@ -8532,6 +8536,15 @@ function _bindConfigControls() {
});
extractStructuredModeEl.dataset.bmeBound = "true";
}
const extractSplitExecutionModeEl = document.getElementById(
"bme-setting-extract-split-execution-mode",
);
if (extractSplitExecutionModeEl && extractSplitExecutionModeEl.dataset.bmeBound !== "true") {
extractSplitExecutionModeEl.addEventListener("change", () => {
_patchSettings({ extractSplitExecutionMode: extractSplitExecutionModeEl.value || "parallel" });
});
extractSplitExecutionModeEl.dataset.bmeBound = "true";
}
const extractWorldbookModeEl = document.getElementById(
"bme-setting-extract-worldbook-mode",
);