diff --git a/maintenance/extractor.js b/maintenance/extractor.js index ca422be..d9d8cd5 100644 --- a/maintenance/extractor.js +++ b/maintenance/extractor.js @@ -843,6 +843,272 @@ function applyOperationStoryTimeToNode( node.storyTimeSpan = createSpanFromStoryTime(null, source); } +function resolveExtractionDraft({ llmResult, schema, graph, scopeRuntime }) { + const llmFailure = + llmResult && typeof llmResult === "object" && "ok" in llmResult + ? llmResult + : null; + const result = llmFailure + ? llmFailure.ok + ? llmFailure.data + : null + : llmResult; + const normalizedResult = normalizeExtractionResultPayload(result, schema); + const ownershipWarnings = []; + const extractionOwnerContext = deriveExtractionOwnerContext( + graph, + normalizedResult, + scopeRuntime, + ); + const normalizedCognitionUpdates = normalizeCognitionUpdatesWithOwnerContext( + graph, + normalizedResult?.cognitionUpdates, + scopeRuntime, + extractionOwnerContext, + ownershipWarnings, + ); + + return { + llmFailure, + result, + normalizedResult, + ownershipWarnings, + extractionOwnerContext, + normalizedCognitionUpdates, + }; +} + +function validateExtractionDraft({ + draft, + lastProcessedSeq, +}) { + const { result, llmFailure, normalizedResult } = draft; + if (!normalizedResult || !Array.isArray(normalizedResult.operations)) { + const diagType = result === null + ? "null" + : Array.isArray(result) + ? `array(len=${result.length})` + : typeof result; + const diagKeys = isPlainObject(result) + ? Object.keys(result).slice(0, 10).join(", ") + : ""; + const diagPreview = typeof result === "string" + ? result.slice(0, 120) + : ""; + console.warn( + `[ST-BME] 提取 LLM 未返回有效操作 ` + + `[type=${diagType}]` + + (diagKeys ? ` [keys=${diagKeys}]` : "") + + (diagPreview ? ` [preview=${diagPreview}]` : "") + + (llmFailure?.ok === false && llmFailure?.errorType + ? ` [failureType=${String(llmFailure.errorType)}]` + : "") + + (llmFailure?.ok === false && llmFailure?.failureReason + ? ` [failureReason=${String(llmFailure.failureReason).slice(0, 200)}]` + : ""), + ); + const failureReason = + llmFailure?.ok === false + ? String(llmFailure.failureReason || "").trim() + : ""; + return { + success: false, + error: failureReason + ? `提取 LLM 未返回有效操作: ${failureReason}` + : "提取 LLM 未返回有效操作", + newNodes: 0, + updatedNodes: 0, + newEdges: 0, + newNodeIds: [], + processedRange: [lastProcessedSeq, lastProcessedSeq], + }; + } + return null; +} + +function commitExtractionPlan({ + graph, + normalizedResult, + currentSeq, + schema, + scopeRuntime, + extractionOwnerContext, + ownershipWarnings, + effectiveStartSeq, + effectiveEndSeq, +}) { + // 执行操作 + const stats = { newNodes: 0, updatedNodes: 0, newEdges: 0 }; + const newNodeIds = []; // v2: 收集新建节点 ID(用于进化引擎) + const updatedNodeIds = []; + const refMap = new Map(); + const pendingLinkJobs = []; + const suppressedDefaultPairKeys = new Set(); + const operationErrors = []; + const normalizedBatchStoryTime = normalizedResult?.batchStoryTime || null; + + for (const op of normalizedResult.operations) { + try { + switch (op.action) { + case "create": { + const createResult = handleCreate( + graph, + op, + currentSeq, + schema, + refMap, + stats, + scopeRuntime, + extractionOwnerContext, + ownershipWarnings, + normalizedBatchStoryTime, + ); + if (createResult?.nodeId) { + queueOperationLinks(pendingLinkJobs, createResult.nodeId, op.links); + } + if (createResult?.created === true && createResult.nodeId) { + newNodeIds.push(createResult.nodeId); + } + if (createResult?.updated === true && createResult.nodeId) { + updatedNodeIds.push(createResult.nodeId); + } + break; + } + case "update": + { + const updatedNodeId = handleUpdate( + graph, + op, + currentSeq, + stats, + scopeRuntime, + extractionOwnerContext, + ownershipWarnings, + normalizedBatchStoryTime, + ); + if (updatedNodeId) { + updatedNodeIds.push(updatedNodeId); + queueOperationLinks(pendingLinkJobs, updatedNodeId, op.links); + } + } + break; + case "delete": + handleDelete(graph, op, stats); + break; + case "_skip": + // Mem0 对照判定为重复,跳过 + break; + default: { + const message = `[ST-BME] 未知操作类型: ${op?.action ?? ""}`; + console.warn(message, op); + operationErrors.push(message); + break; + } + } + } catch (e) { + console.error(`[ST-BME] 操作执行失败:`, op, e); + operationErrors.push(e?.message || String(e)); + } + } + + if (operationErrors.length > 0) { + return { + success: false, + error: operationErrors.join(" | "), + ...stats, + newNodeIds, + processedRange: [effectiveStartSeq, effectiveEndSeq], + }; + } + + return { + success: true, + stats, + newNodeIds, + updatedNodeIds, + refMap, + pendingLinkJobs, + suppressedDefaultPairKeys, + normalizedBatchStoryTime, + }; +} + +async function applyExtractionPostCommit({ + graph, + pendingLinkJobs, + refMap, + stats, + settings, + newNodeIds, + updatedNodeIds, + embeddingConfig, + signal, + effectiveEndSeq, + ownershipWarnings, + normalizedCognitionUpdates, + normalizedResult, + normalizedBatchStoryTime, + scopeRuntime, + extractionOwnerContext, + suppressedDefaultPairKeys, +}) { + applyPendingLinks(graph, pendingLinkJobs, refMap, stats, { + suppressedDefaultPairKeys, + }); + applyDefaultBatchEdges( + graph, + [...new Set([...newNodeIds, ...updatedNodeIds])], + stats, + settings, + { + suppressedDefaultPairKeys, + }, + ); + + // 为新建节点生成 embedding。失败不应回滚整批图谱写入。 + try { + await generateNodeEmbeddings(graph, embeddingConfig, signal); + } catch (error) { + if (isAbortError(error)) { + throw error; + } + console.error("[ST-BME] 节点 embedding 生成失败,保留图谱写入:", error); + } + + // 更新处理进度:统一记录为已处理到的末个 chat 索引 + graph.lastProcessedSeq = Math.max( + graph.lastProcessedSeq ?? -1, + effectiveEndSeq, + ); + const changedNodeIds = [...new Set([...newNodeIds, ...updatedNodeIds])]; + if (ownershipWarnings.length > 0) { + debugWarn( + `[ST-BME] 已跳过 ${ownershipWarnings.length} 条缺少具体人物 owner 的主观记忆或认知更新`, + ); + } + applyCognitionUpdates(graph, normalizedCognitionUpdates, { + refMap, + changedNodeIds, + scopeRuntime, + source: "extract", + }); + applyRegionUpdates(graph, normalizedResult.regionUpdates, { + changedNodeIds, + source: "extract", + }); + const batchStoryTimeResult = applyBatchStoryTime( + graph, + normalizedBatchStoryTime, + "extract", + ); + updateRuntimeScopeState(graph, newNodeIds, scopeRuntime, extractionOwnerContext); + + return { + changedNodeIds, + batchStoryTimeResult, + }; +} + /** * 对未处理的对话楼层执行记忆提取 * @@ -1170,219 +1436,64 @@ export async function extractMemories({ returnFailureDetails: true, }); throwIfAborted(signal); - const llmFailure = - llmResult && typeof llmResult === "object" && "ok" in llmResult - ? llmResult - : null; - const result = llmFailure - ? llmFailure.ok - ? llmFailure.data - : null - : llmResult; - const normalizedResult = normalizeExtractionResultPayload(result, schema); - const ownershipWarnings = []; - const extractionOwnerContext = deriveExtractionOwnerContext( + const draft = resolveExtractionDraft({ + llmResult, + schema, graph, - normalizedResult, scopeRuntime, - ); - const normalizedCognitionUpdates = normalizeCognitionUpdatesWithOwnerContext( - graph, - normalizedResult?.cognitionUpdates, - scopeRuntime, - extractionOwnerContext, - ownershipWarnings, - ); - - if (!normalizedResult || !Array.isArray(normalizedResult.operations)) { - const diagType = result === null - ? "null" - : Array.isArray(result) - ? `array(len=${result.length})` - : typeof result; - const diagKeys = isPlainObject(result) - ? Object.keys(result).slice(0, 10).join(", ") - : ""; - const diagPreview = typeof result === "string" - ? result.slice(0, 120) - : ""; - console.warn( - `[ST-BME] 提取 LLM 未返回有效操作 ` + - `[type=${diagType}]` + - (diagKeys ? ` [keys=${diagKeys}]` : "") + - (diagPreview ? ` [preview=${diagPreview}]` : "") + - (llmFailure?.ok === false && llmFailure?.errorType - ? ` [failureType=${String(llmFailure.errorType)}]` - : "") + - (llmFailure?.ok === false && llmFailure?.failureReason - ? ` [failureReason=${String(llmFailure.failureReason).slice(0, 200)}]` - : ""), - ); - const failureReason = - llmFailure?.ok === false - ? String(llmFailure.failureReason || "").trim() - : ""; - return { - success: false, - error: failureReason - ? `提取 LLM 未返回有效操作: ${failureReason}` - : "提取 LLM 未返回有效操作", - newNodes: 0, - updatedNodes: 0, - newEdges: 0, - newNodeIds: [], - processedRange: [lastProcessedSeq, lastProcessedSeq], - }; - } - - // 执行操作 - const stats = { newNodes: 0, updatedNodes: 0, newEdges: 0 }; - const newNodeIds = []; // v2: 收集新建节点 ID(用于进化引擎) - const updatedNodeIds = []; - const refMap = new Map(); - const pendingLinkJobs = []; - const suppressedDefaultPairKeys = new Set(); - const operationErrors = []; - const normalizedBatchStoryTime = normalizedResult?.batchStoryTime || null; - - for (const op of normalizedResult.operations) { - try { - switch (op.action) { - case "create": { - const createResult = handleCreate( - graph, - op, - currentSeq, - schema, - refMap, - stats, - scopeRuntime, - extractionOwnerContext, - ownershipWarnings, - normalizedBatchStoryTime, - ); - if (createResult?.nodeId) { - queueOperationLinks(pendingLinkJobs, createResult.nodeId, op.links); - } - if (createResult?.created === true && createResult.nodeId) { - newNodeIds.push(createResult.nodeId); - } - if (createResult?.updated === true && createResult.nodeId) { - updatedNodeIds.push(createResult.nodeId); - } - break; - } - case "update": - { - const updatedNodeId = handleUpdate( - graph, - op, - currentSeq, - stats, - scopeRuntime, - extractionOwnerContext, - ownershipWarnings, - normalizedBatchStoryTime, - ); - if (updatedNodeId) { - updatedNodeIds.push(updatedNodeId); - queueOperationLinks(pendingLinkJobs, updatedNodeId, op.links); - } - } - break; - case "delete": - handleDelete(graph, op, stats); - break; - case "_skip": - // Mem0 对照判定为重复,跳过 - break; - default: { - const message = `[ST-BME] 未知操作类型: ${op?.action ?? ""}`; - console.warn(message, op); - operationErrors.push(message); - break; - } - } - } catch (e) { - console.error(`[ST-BME] 操作执行失败:`, op, e); - operationErrors.push(e?.message || String(e)); - } - } - - if (operationErrors.length > 0) { - return { - success: false, - error: operationErrors.join(" | "), - ...stats, - newNodeIds, - processedRange: [effectiveStartSeq, effectiveEndSeq], - }; - } - - applyPendingLinks(graph, pendingLinkJobs, refMap, stats, { - suppressedDefaultPairKeys, }); - applyDefaultBatchEdges( + const validationFailure = validateExtractionDraft({ + draft, + lastProcessedSeq, + }); + if (validationFailure) return validationFailure; + + const commitResult = commitExtractionPlan({ graph, - [...new Set([...newNodeIds, ...updatedNodeIds])], - stats, - settings, - { - suppressedDefaultPairKeys, - }, - ); - - // 为新建节点生成 embedding。失败不应回滚整批图谱写入。 - try { - await generateNodeEmbeddings(graph, embeddingConfig, signal); - } catch (error) { - if (isAbortError(error)) { - throw error; - } - console.error("[ST-BME] 节点 embedding 生成失败,保留图谱写入:", error); - } - - // 更新处理进度:统一记录为已处理到的末个 chat 索引 - graph.lastProcessedSeq = Math.max( - graph.lastProcessedSeq ?? -1, + normalizedResult: draft.normalizedResult, + currentSeq, + schema, + scopeRuntime, + extractionOwnerContext: draft.extractionOwnerContext, + ownershipWarnings: draft.ownershipWarnings, + effectiveStartSeq, effectiveEndSeq, - ); - const changedNodeIds = [...new Set([...newNodeIds, ...updatedNodeIds])]; - if (ownershipWarnings.length > 0) { - debugWarn( - `[ST-BME] 已跳过 ${ownershipWarnings.length} 条缺少具体人物 owner 的主观记忆或认知更新`, - ); - } - applyCognitionUpdates(graph, normalizedCognitionUpdates, { - refMap, - changedNodeIds, - scopeRuntime, - source: "extract", }); - applyRegionUpdates(graph, normalizedResult.regionUpdates, { - changedNodeIds, - source: "extract", - }); - const batchStoryTimeResult = applyBatchStoryTime( + if (commitResult.success === false) return commitResult; + + const postCommitResult = await applyExtractionPostCommit({ graph, - normalizedBatchStoryTime, - "extract", - ); - updateRuntimeScopeState(graph, newNodeIds, scopeRuntime, extractionOwnerContext); + pendingLinkJobs: commitResult.pendingLinkJobs, + refMap: commitResult.refMap, + stats: commitResult.stats, + settings, + newNodeIds: commitResult.newNodeIds, + updatedNodeIds: commitResult.updatedNodeIds, + embeddingConfig, + signal, + effectiveEndSeq, + ownershipWarnings: draft.ownershipWarnings, + normalizedCognitionUpdates: draft.normalizedCognitionUpdates, + normalizedResult: draft.normalizedResult, + normalizedBatchStoryTime: commitResult.normalizedBatchStoryTime, + scopeRuntime, + extractionOwnerContext: draft.extractionOwnerContext, + suppressedDefaultPairKeys: commitResult.suppressedDefaultPairKeys, + }); debugLog( - `[ST-BME] 提取完成: 新建 ${stats.newNodes}, 更新 ${stats.updatedNodes}, 新边 ${stats.newEdges}, lastProcessedSeq=${graph.lastProcessedSeq}`, + `[ST-BME] 提取完成: 新建 ${commitResult.stats.newNodes}, 更新 ${commitResult.stats.updatedNodes}, 新边 ${commitResult.stats.newEdges}, lastProcessedSeq=${graph.lastProcessedSeq}`, ); return { success: true, error: "", - ...stats, - newNodeIds, - changedNodeIds, - ownerWarnings: ownershipWarnings, - batchStoryTime: normalizedBatchStoryTime, - batchStoryTimeResult, + ...commitResult.stats, + newNodeIds: commitResult.newNodeIds, + changedNodeIds: postCommitResult.changedNodeIds, + ownerWarnings: draft.ownershipWarnings, + batchStoryTime: commitResult.normalizedBatchStoryTime, + batchStoryTimeResult: postCommitResult.batchStoryTimeResult, processedRange: [effectiveStartSeq, effectiveEndSeq], }; }