feat(extract): add split pipeline skeleton

This commit is contained in:
youzini
2026-06-09 04:47:31 +00:00
parent 8a5b9dc98c
commit bef75e87a7
4 changed files with 450 additions and 29 deletions

View File

@@ -1109,6 +1109,60 @@ async function applyExtractionPostCommit({
};
}
function resolveExtractPipelineVersion(settings = {}) {
return String(settings?.extractPipelineVersion || "legacy-single").trim().toLowerCase();
}
function shouldUseSplitExtractionPipeline(settings = {}) {
return resolveExtractPipelineVersion(settings) === "split-v1";
}
function cloneNormalizedExtractionResult(result = {}) {
return {
...result,
operations: Array.isArray(result?.operations)
? result.operations.map((op) => ({ ...op }))
: [],
cognitionUpdates: Array.isArray(result?.cognitionUpdates)
? result.cognitionUpdates.map((item) => ({ ...item }))
: [],
regionUpdates: Array.isArray(result?.regionUpdates)
? result.regionUpdates.map((item) => ({ ...item }))
: result?.regionUpdates,
};
}
function filterObjectiveExtractionResult(result = {}) {
const next = cloneNormalizedExtractionResult(result);
next.operations = next.operations.filter((op) => String(op?.type || "") !== "pov_memory");
next.cognitionUpdates = [];
return next;
}
function filterSubjectiveExtractionResult(result = {}) {
const next = cloneNormalizedExtractionResult(result);
next.operations = next.operations.filter((op) => String(op?.type || "") === "pov_memory");
next.regionUpdates = {};
next.batchStoryTime = null;
return next;
}
function mergeSplitExtractionResults(objectiveResult = {}, subjectiveResult = {}) {
return {
...objectiveResult,
operations: [
...(Array.isArray(objectiveResult?.operations) ? objectiveResult.operations : []),
...(Array.isArray(subjectiveResult?.operations) ? subjectiveResult.operations : []),
],
cognitionUpdates: [
...(Array.isArray(objectiveResult?.cognitionUpdates) ? objectiveResult.cognitionUpdates : []),
...(Array.isArray(subjectiveResult?.cognitionUpdates) ? subjectiveResult.cognitionUpdates : []),
],
regionUpdates: objectiveResult?.regionUpdates || {},
batchStoryTime: objectiveResult?.batchStoryTime || null,
};
}
/**
* 对未处理的对话楼层执行记忆提取
*
@@ -1418,35 +1472,84 @@ export async function extractMemories({
}
}
// 调用 LLM
const llmResult = await callLLMForJSON({
systemPrompt: llmSystemPrompt,
userPrompt: promptPayload.userPrompt,
maxRetries: 2,
signal,
taskType: "extract",
debugContext: createExtractTaskLlmDebugContext(
promptBuild,
extractRegexInput,
extractionInput?.debug || null,
),
promptMessages: promptPayload.promptMessages,
additionalMessages: promptPayloadAdditionalMessages,
onStreamProgress,
returnFailureDetails: true,
});
throwIfAborted(signal);
const draft = resolveExtractionDraft({
llmResult,
schema,
graph,
scopeRuntime,
});
const validationFailure = validateExtractionDraft({
draft,
lastProcessedSeq,
});
if (validationFailure) return validationFailure;
const callExtractionStage = async (taskType) => {
const stageResult = await callLLMForJSON({
systemPrompt: llmSystemPrompt,
userPrompt: promptPayload.userPrompt,
maxRetries: 2,
signal,
taskType,
debugContext: createExtractTaskLlmDebugContext(
promptBuild,
extractRegexInput,
extractionInput?.debug || null,
),
promptMessages: promptPayload.promptMessages,
additionalMessages: promptPayloadAdditionalMessages,
onStreamProgress,
returnFailureDetails: true,
});
throwIfAborted(signal);
return stageResult;
};
let draft = null;
if (shouldUseSplitExtractionPipeline(settings)) {
const objectiveLlmResult = await callExtractionStage("extract_objective");
const objectiveDraft = resolveExtractionDraft({
llmResult: objectiveLlmResult,
schema,
graph,
scopeRuntime,
});
const objectiveValidationFailure = validateExtractionDraft({
draft: objectiveDraft,
lastProcessedSeq,
});
if (objectiveValidationFailure) return objectiveValidationFailure;
const subjectiveLlmResult = await callExtractionStage("extract_subjective");
const subjectiveDraft = resolveExtractionDraft({
llmResult: subjectiveLlmResult,
schema,
graph,
scopeRuntime,
});
const subjectiveValidationFailure = validateExtractionDraft({
draft: subjectiveDraft,
lastProcessedSeq,
});
if (subjectiveValidationFailure) return subjectiveValidationFailure;
draft = resolveExtractionDraft({
llmResult: mergeSplitExtractionResults(
filterObjectiveExtractionResult(objectiveDraft.normalizedResult),
filterSubjectiveExtractionResult(subjectiveDraft.normalizedResult),
),
schema,
graph,
scopeRuntime,
});
const mergedValidationFailure = validateExtractionDraft({
draft,
lastProcessedSeq,
});
if (mergedValidationFailure) return mergedValidationFailure;
} else {
// 调用 LLM
const llmResult = await callExtractionStage("extract");
draft = resolveExtractionDraft({
llmResult,
schema,
graph,
scopeRuntime,
});
const validationFailure = validateExtractionDraft({
draft,
lastProcessedSeq,
});
if (validationFailure) return validationFailure;
}
const commitResult = commitExtractionPlan({
graph,

View File

@@ -37,6 +37,7 @@ export const defaultSettings = {
extractIncludeStoryTime: true,
extractIncludeSummaries: true,
extractActionMode: "pending",
extractPipelineVersion: "legacy-single",
// 召回设置
recallEnabled: true,

View File

@@ -109,6 +109,7 @@ assert.equal(defaultSettings.loadNativeHydrateThresholdRecords, 30000);
assert.equal(defaultSettings.nativeRolloutVersion, 2);
assert.equal(defaultSettings.nativeEngineFailOpen, true);
assert.equal(defaultSettings.graphNativeForceDisable, false);
assert.equal(defaultSettings.extractPipelineVersion, "legacy-single");
assert.equal(defaultSettings.taskProfilesVersion, 3);
assert.equal(defaultSettings.extractObjectivePrompt, "");
assert.equal(defaultSettings.extractSubjectivePrompt, "");

View File

@@ -0,0 +1,316 @@
import assert from "node:assert/strict";
import {
installResolveHooks,
toDataModuleUrl,
} from "./helpers/register-hooks-compat.mjs";
const extensionsShimSource = [
"export const extension_settings = {};",
"export function getContext() {",
" return globalThis.__stBmeTestContext || {",
" chat: [],",
" chatMetadata: {},",
" extensionSettings: {},",
" powerUserSettings: {},",
" characters: {},",
" characterId: null,",
" name1: '玩家',",
" name2: '艾琳',",
" chatId: 'test-chat',",
" };",
"}",
].join("\n");
const scriptShimSource = [
"export function getRequestHeaders() {",
" return {};",
"}",
"export function substituteParamsExtended(value) {",
" return String(value ?? '');",
"}",
].join("\n");
const openAiShimSource = [
"export const chat_completion_sources = {};",
"export async function sendOpenAIRequest() {",
" throw new Error('sendOpenAIRequest should not be called in extractor-split-pipeline test');",
"}",
].join("\n");
installResolveHooks([
{
specifiers: [
"../../../extensions.js",
"../../../../extensions.js",
"../../../../../extensions.js",
],
url: toDataModuleUrl(extensionsShimSource),
},
{
specifiers: [
"../../../../script.js",
"../../../../../script.js",
],
url: toDataModuleUrl(scriptShimSource),
},
{
specifiers: [
"../../../../openai.js",
"../../../../../openai.js",
],
url: toDataModuleUrl(openAiShimSource),
},
]);
const { createEmptyGraph, createNode, addNode } = await import("../graph/graph.js");
const { DEFAULT_NODE_SCHEMA } = await import("../graph/schema.js");
const { extractMemories } = await import("../maintenance/extractor.js");
function setTestOverrides(overrides = {}) {
globalThis.__stBmeTestOverrides = overrides;
return () => {
delete globalThis.__stBmeTestOverrides;
};
}
globalThis.__stBmeTestContext = {
chat: [],
chatMetadata: {},
extensionSettings: {},
powerUserSettings: {},
characters: {},
characterId: null,
name1: "玩家",
name2: "艾琳",
chatId: "test-chat",
};
function createGraphWithCharacter() {
const graph = createEmptyGraph();
addNode(
graph,
createNode({
type: "character",
fields: { name: "艾琳" },
seq: 1,
}),
);
return graph;
}
const baseExtractParams = {
messages: [
{ seq: 20, role: "user", content: "钟楼里传来第二次钟声。", name: "玩家", speaker: "玩家" },
{ seq: 21, role: "assistant", content: "艾琳记下钟声,怀疑暗道就在附近。", name: "艾琳", speaker: "艾琳" },
],
startSeq: 20,
endSeq: 21,
schema: DEFAULT_NODE_SCHEMA,
embeddingConfig: null,
};
function objectivePayload() {
return {
operations: [
{
action: "create",
type: "event",
ref: "evt-clock",
fields: {
title: "钟楼钟声",
summary: "钟楼传来第二次钟声,暗示暗道线索仍在附近。",
participants: "玩家,艾琳",
status: "ongoing",
},
scope: { layer: "objective" },
},
],
cognitionUpdates: [
{
ownerType: "character",
ownerName: "艾琳",
knownRefs: ["evt-clock"],
},
],
regionUpdates: {},
};
}
function subjectivePayload() {
return {
operations: [
{
action: "create",
type: "pov_memory",
fields: {
summary: "艾琳把第二次钟声记成暗道仍在呼唤她的证据。",
belief: "暗道就在钟楼附近",
emotion: "警觉",
certainty: "unsure",
about: "evt-clock",
},
scope: {
layer: "pov",
ownerType: "character",
ownerName: "艾琳",
ownerId: "艾琳",
},
},
],
cognitionUpdates: [
{
ownerType: "character",
ownerName: "艾琳",
knownRefs: ["evt-clock"],
},
],
regionUpdates: {},
};
}
function activeNodes(graph, type) {
return graph.nodes.filter((node) => node.type === type && node.archived !== true);
}
function hasActiveEdgeBetween(graph, leftId, rightId) {
return graph.edges.some((edge) => {
if (edge.invalidAt || edge.expiredAt) return false;
return (
(edge.fromId === leftId && edge.toId === rightId) ||
(edge.fromId === rightId && edge.toId === leftId)
);
});
}
function characterKnowledgeEntries(graph) {
return Object.values(graph.knowledgeState?.owners || {}).filter(
(entry) =>
String(entry?.ownerType || "") === "character" &&
String(entry?.ownerName || "") === "艾琳",
);
}
// split-v1 calls objective then subjective, merges both stage outputs, and commits once.
{
const graph = createGraphWithCharacter();
const capturedTaskTypes = [];
const restore = setTestOverrides({
llm: {
async callLLMForJSON(payload = {}) {
capturedTaskTypes.push(payload.taskType);
if (payload.taskType === "extract_objective") return objectivePayload();
if (payload.taskType === "extract_subjective") return subjectivePayload();
return { operations: [], cognitionUpdates: [], regionUpdates: {} };
},
},
});
try {
const result = await extractMemories({
graph,
...baseExtractParams,
settings: { extractPipelineVersion: "split-v1" },
});
assert.deepEqual(
capturedTaskTypes,
["extract_objective", "extract_subjective"],
"split-v1 should call the LLM once for objective extraction, then once for subjective extraction",
);
assert.equal(result.success, true);
assert.equal(result.newNodes, 2, "objective event and subjective POV memory should be committed together");
const [eventNode] = activeNodes(graph, "event");
const [povNode] = activeNodes(graph, "pov_memory");
assert.ok(eventNode, "objective event operation should be committed");
assert.ok(povNode, "subjective pov_memory operation should be committed");
assert.equal(povNode.scope?.ownerType, "character");
assert.equal(povNode.scope?.ownerName, "艾琳");
assert.equal(graph.lastProcessedSeq, 21);
assert.ok(
hasActiveEdgeBetween(graph, eventNode.id, povNode.id),
"merged split stages should be committed as one batch so default batch edges see both nodes",
);
const knowledgeEntry = characterKnowledgeEntries(graph).find((entry) =>
Array.isArray(entry.knownNodeIds) && entry.knownNodeIds.includes(eventNode.id),
);
assert.ok(
knowledgeEntry,
"subjective cognitionUpdates should apply through the merged ref map",
);
} finally {
restore();
}
}
// Invalid subjective output fails the split extraction before any objective-only commit mutates the graph.
{
const graph = createGraphWithCharacter();
const initialNodeCount = graph.nodes.length;
const initialEdgeCount = graph.edges.length;
const capturedTaskTypes = [];
const restore = setTestOverrides({
llm: {
async callLLMForJSON(payload = {}) {
capturedTaskTypes.push(payload.taskType);
if (payload.taskType === "extract_objective") return objectivePayload();
if (payload.taskType === "extract_subjective") return { thought: "missing operations" };
return { thought: "legacy path should not be used for split-v1" };
},
},
});
try {
const result = await extractMemories({
graph,
...baseExtractParams,
settings: { extractPipelineVersion: "split-v1" },
});
assert.deepEqual(
capturedTaskTypes,
["extract_objective", "extract_subjective"],
"split-v1 should validate both objective and subjective payloads before commit",
);
assert.equal(result.success, false);
assert.equal(graph.nodes.length, initialNodeCount, "invalid subjective payload should not commit objective nodes");
assert.equal(graph.edges.length, initialEdgeCount, "invalid subjective payload should not create edges");
assert.equal(graph.lastProcessedSeq ?? -1, -1, "invalid split extraction should not advance extraction progress");
} finally {
restore();
}
}
// Legacy/default extraction keeps the single extract taskType path.
{
const graph = createGraphWithCharacter();
const capturedTaskTypes = [];
const restore = setTestOverrides({
llm: {
async callLLMForJSON(payload = {}) {
capturedTaskTypes.push(payload.taskType);
return { operations: [], cognitionUpdates: [], regionUpdates: {} };
},
},
});
try {
const result = await extractMemories({
graph,
...baseExtractParams,
settings: {},
});
assert.equal(result.success, true);
assert.deepEqual(
capturedTaskTypes,
["extract"],
"default extraction should keep calling only legacy taskType extract",
);
} finally {
restore();
}
}
console.log("extractor-split-pipeline tests passed");