refactor(extract): stage extraction commit flow

This commit is contained in:
youzini
2026-06-09 04:29:40 +00:00
parent 22f329b8b0
commit 2ca94a7afb

View File

@@ -843,6 +843,272 @@ function applyOperationStoryTimeToNode(
node.storyTimeSpan = createSpanFromStoryTime(null, source);
}
function resolveExtractionDraft({ llmResult, schema, graph, scopeRuntime }) {
const llmFailure =
llmResult && typeof llmResult === "object" && "ok" in llmResult
? llmResult
: null;
const result = llmFailure
? llmFailure.ok
? llmFailure.data
: null
: llmResult;
const normalizedResult = normalizeExtractionResultPayload(result, schema);
const ownershipWarnings = [];
const extractionOwnerContext = deriveExtractionOwnerContext(
graph,
normalizedResult,
scopeRuntime,
);
const normalizedCognitionUpdates = normalizeCognitionUpdatesWithOwnerContext(
graph,
normalizedResult?.cognitionUpdates,
scopeRuntime,
extractionOwnerContext,
ownershipWarnings,
);
return {
llmFailure,
result,
normalizedResult,
ownershipWarnings,
extractionOwnerContext,
normalizedCognitionUpdates,
};
}
function validateExtractionDraft({
draft,
lastProcessedSeq,
}) {
const { result, llmFailure, normalizedResult } = draft;
if (!normalizedResult || !Array.isArray(normalizedResult.operations)) {
const diagType = result === null
? "null"
: Array.isArray(result)
? `array(len=${result.length})`
: typeof result;
const diagKeys = isPlainObject(result)
? Object.keys(result).slice(0, 10).join(", ")
: "";
const diagPreview = typeof result === "string"
? result.slice(0, 120)
: "";
console.warn(
`[ST-BME] 提取 LLM 未返回有效操作 ` +
`[type=${diagType}]` +
(diagKeys ? ` [keys=${diagKeys}]` : "") +
(diagPreview ? ` [preview=${diagPreview}]` : "") +
(llmFailure?.ok === false && llmFailure?.errorType
? ` [failureType=${String(llmFailure.errorType)}]`
: "") +
(llmFailure?.ok === false && llmFailure?.failureReason
? ` [failureReason=${String(llmFailure.failureReason).slice(0, 200)}]`
: ""),
);
const failureReason =
llmFailure?.ok === false
? String(llmFailure.failureReason || "").trim()
: "";
return {
success: false,
error: failureReason
? `提取 LLM 未返回有效操作: ${failureReason}`
: "提取 LLM 未返回有效操作",
newNodes: 0,
updatedNodes: 0,
newEdges: 0,
newNodeIds: [],
processedRange: [lastProcessedSeq, lastProcessedSeq],
};
}
return null;
}
function commitExtractionPlan({
graph,
normalizedResult,
currentSeq,
schema,
scopeRuntime,
extractionOwnerContext,
ownershipWarnings,
effectiveStartSeq,
effectiveEndSeq,
}) {
// 执行操作
const stats = { newNodes: 0, updatedNodes: 0, newEdges: 0 };
const newNodeIds = []; // v2: 收集新建节点 ID用于进化引擎
const updatedNodeIds = [];
const refMap = new Map();
const pendingLinkJobs = [];
const suppressedDefaultPairKeys = new Set();
const operationErrors = [];
const normalizedBatchStoryTime = normalizedResult?.batchStoryTime || null;
for (const op of normalizedResult.operations) {
try {
switch (op.action) {
case "create": {
const createResult = handleCreate(
graph,
op,
currentSeq,
schema,
refMap,
stats,
scopeRuntime,
extractionOwnerContext,
ownershipWarnings,
normalizedBatchStoryTime,
);
if (createResult?.nodeId) {
queueOperationLinks(pendingLinkJobs, createResult.nodeId, op.links);
}
if (createResult?.created === true && createResult.nodeId) {
newNodeIds.push(createResult.nodeId);
}
if (createResult?.updated === true && createResult.nodeId) {
updatedNodeIds.push(createResult.nodeId);
}
break;
}
case "update":
{
const updatedNodeId = handleUpdate(
graph,
op,
currentSeq,
stats,
scopeRuntime,
extractionOwnerContext,
ownershipWarnings,
normalizedBatchStoryTime,
);
if (updatedNodeId) {
updatedNodeIds.push(updatedNodeId);
queueOperationLinks(pendingLinkJobs, updatedNodeId, op.links);
}
}
break;
case "delete":
handleDelete(graph, op, stats);
break;
case "_skip":
// Mem0 对照判定为重复,跳过
break;
default: {
const message = `[ST-BME] 未知操作类型: ${op?.action ?? "<missing>"}`;
console.warn(message, op);
operationErrors.push(message);
break;
}
}
} catch (e) {
console.error(`[ST-BME] 操作执行失败:`, op, e);
operationErrors.push(e?.message || String(e));
}
}
if (operationErrors.length > 0) {
return {
success: false,
error: operationErrors.join(" | "),
...stats,
newNodeIds,
processedRange: [effectiveStartSeq, effectiveEndSeq],
};
}
return {
success: true,
stats,
newNodeIds,
updatedNodeIds,
refMap,
pendingLinkJobs,
suppressedDefaultPairKeys,
normalizedBatchStoryTime,
};
}
async function applyExtractionPostCommit({
graph,
pendingLinkJobs,
refMap,
stats,
settings,
newNodeIds,
updatedNodeIds,
embeddingConfig,
signal,
effectiveEndSeq,
ownershipWarnings,
normalizedCognitionUpdates,
normalizedResult,
normalizedBatchStoryTime,
scopeRuntime,
extractionOwnerContext,
suppressedDefaultPairKeys,
}) {
applyPendingLinks(graph, pendingLinkJobs, refMap, stats, {
suppressedDefaultPairKeys,
});
applyDefaultBatchEdges(
graph,
[...new Set([...newNodeIds, ...updatedNodeIds])],
stats,
settings,
{
suppressedDefaultPairKeys,
},
);
// 为新建节点生成 embedding。失败不应回滚整批图谱写入。
try {
await generateNodeEmbeddings(graph, embeddingConfig, signal);
} catch (error) {
if (isAbortError(error)) {
throw error;
}
console.error("[ST-BME] 节点 embedding 生成失败,保留图谱写入:", error);
}
// 更新处理进度:统一记录为已处理到的末个 chat 索引
graph.lastProcessedSeq = Math.max(
graph.lastProcessedSeq ?? -1,
effectiveEndSeq,
);
const changedNodeIds = [...new Set([...newNodeIds, ...updatedNodeIds])];
if (ownershipWarnings.length > 0) {
debugWarn(
`[ST-BME] 已跳过 ${ownershipWarnings.length} 条缺少具体人物 owner 的主观记忆或认知更新`,
);
}
applyCognitionUpdates(graph, normalizedCognitionUpdates, {
refMap,
changedNodeIds,
scopeRuntime,
source: "extract",
});
applyRegionUpdates(graph, normalizedResult.regionUpdates, {
changedNodeIds,
source: "extract",
});
const batchStoryTimeResult = applyBatchStoryTime(
graph,
normalizedBatchStoryTime,
"extract",
);
updateRuntimeScopeState(graph, newNodeIds, scopeRuntime, extractionOwnerContext);
return {
changedNodeIds,
batchStoryTimeResult,
};
}
/**
* 对未处理的对话楼层执行记忆提取
*
@@ -1170,219 +1436,64 @@ export async function extractMemories({
returnFailureDetails: true,
});
throwIfAborted(signal);
const llmFailure =
llmResult && typeof llmResult === "object" && "ok" in llmResult
? llmResult
: null;
const result = llmFailure
? llmFailure.ok
? llmFailure.data
: null
: llmResult;
const normalizedResult = normalizeExtractionResultPayload(result, schema);
const ownershipWarnings = [];
const extractionOwnerContext = deriveExtractionOwnerContext(
const draft = resolveExtractionDraft({
llmResult,
schema,
graph,
normalizedResult,
scopeRuntime,
);
const normalizedCognitionUpdates = normalizeCognitionUpdatesWithOwnerContext(
graph,
normalizedResult?.cognitionUpdates,
scopeRuntime,
extractionOwnerContext,
ownershipWarnings,
);
if (!normalizedResult || !Array.isArray(normalizedResult.operations)) {
const diagType = result === null
? "null"
: Array.isArray(result)
? `array(len=${result.length})`
: typeof result;
const diagKeys = isPlainObject(result)
? Object.keys(result).slice(0, 10).join(", ")
: "";
const diagPreview = typeof result === "string"
? result.slice(0, 120)
: "";
console.warn(
`[ST-BME] 提取 LLM 未返回有效操作 ` +
`[type=${diagType}]` +
(diagKeys ? ` [keys=${diagKeys}]` : "") +
(diagPreview ? ` [preview=${diagPreview}]` : "") +
(llmFailure?.ok === false && llmFailure?.errorType
? ` [failureType=${String(llmFailure.errorType)}]`
: "") +
(llmFailure?.ok === false && llmFailure?.failureReason
? ` [failureReason=${String(llmFailure.failureReason).slice(0, 200)}]`
: ""),
);
const failureReason =
llmFailure?.ok === false
? String(llmFailure.failureReason || "").trim()
: "";
return {
success: false,
error: failureReason
? `提取 LLM 未返回有效操作: ${failureReason}`
: "提取 LLM 未返回有效操作",
newNodes: 0,
updatedNodes: 0,
newEdges: 0,
newNodeIds: [],
processedRange: [lastProcessedSeq, lastProcessedSeq],
};
}
// 执行操作
const stats = { newNodes: 0, updatedNodes: 0, newEdges: 0 };
const newNodeIds = []; // v2: 收集新建节点 ID用于进化引擎
const updatedNodeIds = [];
const refMap = new Map();
const pendingLinkJobs = [];
const suppressedDefaultPairKeys = new Set();
const operationErrors = [];
const normalizedBatchStoryTime = normalizedResult?.batchStoryTime || null;
for (const op of normalizedResult.operations) {
try {
switch (op.action) {
case "create": {
const createResult = handleCreate(
graph,
op,
currentSeq,
schema,
refMap,
stats,
scopeRuntime,
extractionOwnerContext,
ownershipWarnings,
normalizedBatchStoryTime,
);
if (createResult?.nodeId) {
queueOperationLinks(pendingLinkJobs, createResult.nodeId, op.links);
}
if (createResult?.created === true && createResult.nodeId) {
newNodeIds.push(createResult.nodeId);
}
if (createResult?.updated === true && createResult.nodeId) {
updatedNodeIds.push(createResult.nodeId);
}
break;
}
case "update":
{
const updatedNodeId = handleUpdate(
graph,
op,
currentSeq,
stats,
scopeRuntime,
extractionOwnerContext,
ownershipWarnings,
normalizedBatchStoryTime,
);
if (updatedNodeId) {
updatedNodeIds.push(updatedNodeId);
queueOperationLinks(pendingLinkJobs, updatedNodeId, op.links);
}
}
break;
case "delete":
handleDelete(graph, op, stats);
break;
case "_skip":
// Mem0 对照判定为重复,跳过
break;
default: {
const message = `[ST-BME] 未知操作类型: ${op?.action ?? "<missing>"}`;
console.warn(message, op);
operationErrors.push(message);
break;
}
}
} catch (e) {
console.error(`[ST-BME] 操作执行失败:`, op, e);
operationErrors.push(e?.message || String(e));
}
}
if (operationErrors.length > 0) {
return {
success: false,
error: operationErrors.join(" | "),
...stats,
newNodeIds,
processedRange: [effectiveStartSeq, effectiveEndSeq],
};
}
applyPendingLinks(graph, pendingLinkJobs, refMap, stats, {
suppressedDefaultPairKeys,
});
applyDefaultBatchEdges(
const validationFailure = validateExtractionDraft({
draft,
lastProcessedSeq,
});
if (validationFailure) return validationFailure;
const commitResult = commitExtractionPlan({
graph,
[...new Set([...newNodeIds, ...updatedNodeIds])],
stats,
settings,
{
suppressedDefaultPairKeys,
},
);
// 为新建节点生成 embedding。失败不应回滚整批图谱写入。
try {
await generateNodeEmbeddings(graph, embeddingConfig, signal);
} catch (error) {
if (isAbortError(error)) {
throw error;
}
console.error("[ST-BME] 节点 embedding 生成失败,保留图谱写入:", error);
}
// 更新处理进度:统一记录为已处理到的末个 chat 索引
graph.lastProcessedSeq = Math.max(
graph.lastProcessedSeq ?? -1,
normalizedResult: draft.normalizedResult,
currentSeq,
schema,
scopeRuntime,
extractionOwnerContext: draft.extractionOwnerContext,
ownershipWarnings: draft.ownershipWarnings,
effectiveStartSeq,
effectiveEndSeq,
);
const changedNodeIds = [...new Set([...newNodeIds, ...updatedNodeIds])];
if (ownershipWarnings.length > 0) {
debugWarn(
`[ST-BME] 已跳过 ${ownershipWarnings.length} 条缺少具体人物 owner 的主观记忆或认知更新`,
);
}
applyCognitionUpdates(graph, normalizedCognitionUpdates, {
refMap,
changedNodeIds,
scopeRuntime,
source: "extract",
});
applyRegionUpdates(graph, normalizedResult.regionUpdates, {
changedNodeIds,
source: "extract",
});
const batchStoryTimeResult = applyBatchStoryTime(
if (commitResult.success === false) return commitResult;
const postCommitResult = await applyExtractionPostCommit({
graph,
normalizedBatchStoryTime,
"extract",
);
updateRuntimeScopeState(graph, newNodeIds, scopeRuntime, extractionOwnerContext);
pendingLinkJobs: commitResult.pendingLinkJobs,
refMap: commitResult.refMap,
stats: commitResult.stats,
settings,
newNodeIds: commitResult.newNodeIds,
updatedNodeIds: commitResult.updatedNodeIds,
embeddingConfig,
signal,
effectiveEndSeq,
ownershipWarnings: draft.ownershipWarnings,
normalizedCognitionUpdates: draft.normalizedCognitionUpdates,
normalizedResult: draft.normalizedResult,
normalizedBatchStoryTime: commitResult.normalizedBatchStoryTime,
scopeRuntime,
extractionOwnerContext: draft.extractionOwnerContext,
suppressedDefaultPairKeys: commitResult.suppressedDefaultPairKeys,
});
debugLog(
`[ST-BME] 提取完成: 新建 ${stats.newNodes}, 更新 ${stats.updatedNodes}, 新边 ${stats.newEdges}, lastProcessedSeq=${graph.lastProcessedSeq}`,
`[ST-BME] 提取完成: 新建 ${commitResult.stats.newNodes}, 更新 ${commitResult.stats.updatedNodes}, 新边 ${commitResult.stats.newEdges}, lastProcessedSeq=${graph.lastProcessedSeq}`,
);
return {
success: true,
error: "",
...stats,
newNodeIds,
changedNodeIds,
ownerWarnings: ownershipWarnings,
batchStoryTime: normalizedBatchStoryTime,
batchStoryTimeResult,
...commitResult.stats,
newNodeIds: commitResult.newNodeIds,
changedNodeIds: postCommitResult.changedNodeIds,
ownerWarnings: draft.ownershipWarnings,
batchStoryTime: commitResult.normalizedBatchStoryTime,
batchStoryTimeResult: postCommitResult.batchStoryTimeResult,
processedRange: [effectiveStartSeq, effectiveEndSeq],
};
}