diff --git a/compressor.js b/compressor.js index 7be9800..41ff32f 100644 --- a/compressor.js +++ b/compressor.js @@ -1,23 +1,30 @@ // ST-BME: 层级压缩引擎 // 超过阈值的节点被 LLM 总结为更高层级的压缩节点 -import { createNode, addNode, createEdge, addEdge, getActiveNodes, getNode } from './graph.js'; -import { callLLMForJSON } from './llm.js'; -import { embedText } from './embedding.js'; -import { buildTaskPrompt } from './prompt-builder.js'; -import { applyTaskRegex } from './task-regex.js'; -import { isDirectVectorConfig } from './vector-index.js'; +import { embedText } from "./embedding.js"; +import { + addEdge, + addNode, + createEdge, + createNode, + getActiveNodes, + getNode, +} from "./graph.js"; +import { callLLMForJSON } from "./llm.js"; +import { buildTaskPrompt } from "./prompt-builder.js"; +import { applyTaskRegex } from "./task-regex.js"; +import { isDirectVectorConfig } from "./vector-index.js"; -function createAbortError(message = '操作已终止') { - const error = new Error(message); - error.name = 'AbortError'; - return error; +function createAbortError(message = "操作已终止") { + const error = new Error(message); + error.name = "AbortError"; + return error; } function throwIfAborted(signal) { - if (signal?.aborted) { - throw signal.reason instanceof Error ? signal.reason : createAbortError(); - } + if (signal?.aborted) { + throw signal.reason instanceof Error ? signal.reason : createAbortError(); + } } /** @@ -30,193 +37,234 @@ function throwIfAborted(signal) { * @param {boolean} [params.force=false] - 忽略阈值强制压缩 * @returns {Promise<{created: number, archived: number}>} */ -export async function compressType({ graph, typeDef, embeddingConfig, force = false, customPrompt, signal, settings = {} }) { - const compression = typeDef.compression; - if (!compression || compression.mode !== 'hierarchical') { - return { created: 0, archived: 0 }; - } +export async function compressType({ + graph, + typeDef, + embeddingConfig, + force = false, + customPrompt, + signal, + settings = {}, +}) { + const compression = typeDef.compression; + if (!compression || compression.mode !== "hierarchical") { + return { created: 0, archived: 0 }; + } - let totalCreated = 0; - let totalArchived = 0; + let totalCreated = 0; + let totalArchived = 0; - // 从最低层级开始逐层压缩 - for (let level = 0; level < compression.maxDepth; level++) { - throwIfAborted(signal); - const result = await compressLevel({ - graph, - typeDef, - level, - embeddingConfig, - force, - customPrompt, - signal, - settings, - }); + // 从最低层级开始逐层压缩 + for (let level = 0; level < compression.maxDepth; level++) { + throwIfAborted(signal); + const result = await compressLevel({ + graph, + typeDef, + level, + embeddingConfig, + force, + customPrompt, + signal, + settings, + }); - totalCreated += result.created; - totalArchived += result.archived; + totalCreated += result.created; + totalArchived += result.archived; - // 如果这一层没有压缩发生,停止 - if (result.created === 0) break; - } + // 如果这一层没有压缩发生,停止 + if (result.created === 0) break; + } - return { created: totalCreated, archived: totalArchived }; + return { created: totalCreated, archived: totalArchived }; } /** * 压缩特定层级的节点 */ -async function compressLevel({ graph, typeDef, level, embeddingConfig, force, customPrompt, signal, settings = {} }) { - const compression = typeDef.compression; - throwIfAborted(signal); +async function compressLevel({ + graph, + typeDef, + level, + embeddingConfig, + force, + customPrompt, + signal, + settings = {}, +}) { + const compression = typeDef.compression; + throwIfAborted(signal); - // 获取该层级的活跃叶子节点 - const levelNodes = getActiveNodes(graph, typeDef.id) - .filter(n => n.level === level) - .sort((a, b) => a.seq - b.seq); + // 获取该层级的活跃叶子节点 + const levelNodes = getActiveNodes(graph, typeDef.id) + .filter((n) => n.level === level) + .sort((a, b) => a.seq - b.seq); - const threshold = force ? Math.max(2, compression.fanIn) : compression.threshold; - const keepRecent = force ? 0 : compression.keepRecentLeaves; + const threshold = force + ? Math.max(2, compression.fanIn) + : compression.threshold; + const keepRecent = force ? 0 : compression.keepRecentLeaves; - // 不够阈值,无需压缩 - if (levelNodes.length <= threshold) { - return { created: 0, archived: 0 }; + // 不够阈值,无需压缩 + if (levelNodes.length <= threshold) { + return { created: 0, archived: 0 }; + } + + // 排除最近的节点 + const compressible = levelNodes.slice(0, levelNodes.length - keepRecent); + if (compressible.length < compression.fanIn) { + return { created: 0, archived: 0 }; + } + + let created = 0; + let archived = 0; + + // 按 fanIn 分组压缩 + for (let i = 0; i < compressible.length; i += compression.fanIn) { + const batch = compressible.slice(i, i + compression.fanIn); + if (batch.length < 2) break; // 至少 2 个才压缩 + + // 调用 LLM 总结 + const summaryResult = await summarizeBatch( + batch, + typeDef, + customPrompt, + signal, + settings, + ); + if (!summaryResult) continue; + + // 创建压缩节点 + const compressedNode = createNode({ + type: typeDef.id, + fields: summaryResult.fields, + seq: batch[batch.length - 1].seq, + seqRange: [ + batch[0].seqRange?.[0] ?? batch[0].seq, + batch[batch.length - 1].seqRange?.[1] ?? batch[batch.length - 1].seq, + ], + importance: Math.max(...batch.map((n) => n.importance)), + }); + + compressedNode.level = level + 1; + compressedNode.childIds = batch.map((n) => n.id); + + // 生成 embedding + if (isDirectVectorConfig(embeddingConfig) && summaryResult.fields.summary) { + const vec = await embedText( + summaryResult.fields.summary, + embeddingConfig, + { signal }, + ); + if (vec) compressedNode.embedding = Array.from(vec); } - // 排除最近的节点 - const compressible = levelNodes.slice(0, levelNodes.length - keepRecent); - if (compressible.length < compression.fanIn) { - return { created: 0, archived: 0 }; + addNode(graph, compressedNode); + migrateBatchEdges(graph, batch, compressedNode); + created++; + + // 归档子节点 + for (const child of batch) { + child.archived = true; + child.parentId = compressedNode.id; + archived++; } + } - let created = 0; - let archived = 0; - - // 按 fanIn 分组压缩 - for (let i = 0; i < compressible.length; i += compression.fanIn) { - const batch = compressible.slice(i, i + compression.fanIn); - if (batch.length < 2) break; // 至少 2 个才压缩 - - // 调用 LLM 总结 - const summaryResult = await summarizeBatch(batch, typeDef, customPrompt, signal, settings); - if (!summaryResult) continue; - - // 创建压缩节点 - const compressedNode = createNode({ - type: typeDef.id, - fields: summaryResult.fields, - seq: batch[batch.length - 1].seq, - seqRange: [batch[0].seqRange?.[0] ?? batch[0].seq, batch[batch.length - 1].seqRange?.[1] ?? batch[batch.length - 1].seq], - importance: Math.max(...batch.map(n => n.importance)), - }); - - compressedNode.level = level + 1; - compressedNode.childIds = batch.map(n => n.id); - - // 生成 embedding - if (isDirectVectorConfig(embeddingConfig) && summaryResult.fields.summary) { - const vec = await embedText(summaryResult.fields.summary, embeddingConfig, { signal }); - if (vec) compressedNode.embedding = Array.from(vec); - } - - addNode(graph, compressedNode); - migrateBatchEdges(graph, batch, compressedNode); - created++; - - // 归档子节点 - for (const child of batch) { - child.archived = true; - child.parentId = compressedNode.id; - archived++; - } - } - - return { created, archived }; + return { created, archived }; } function migrateBatchEdges(graph, batch, compressedNode) { - const batchIds = new Set(batch.map(node => node.id)); - const activeNodeIds = new Set(getActiveNodes(graph).map(node => node.id)); + const batchIds = new Set(batch.map((node) => node.id)); - for (const edge of graph.edges) { - if (edge.invalidAt || edge.expiredAt) continue; + for (const edge of graph.edges) { + if (edge.invalidAt || edge.expiredAt) continue; - const fromInside = batchIds.has(edge.fromId); - const toInside = batchIds.has(edge.toId); - if (!fromInside && !toInside) continue; - if (fromInside && toInside) continue; + const fromInside = batchIds.has(edge.fromId); + const toInside = batchIds.has(edge.toId); + if (!fromInside && !toInside) continue; + if (fromInside && toInside) continue; - const newFromId = fromInside ? compressedNode.id : edge.fromId; - const newToId = toInside ? compressedNode.id : edge.toId; + const newFromId = fromInside ? compressedNode.id : edge.fromId; + const newToId = toInside ? compressedNode.id : edge.toId; - if (newFromId === newToId) continue; - if (!activeNodeIds.has(newFromId) || !activeNodeIds.has(newToId)) continue; - if (!getNode(graph, newFromId) || !getNode(graph, newToId)) continue; + if (newFromId === newToId) continue; + if (!getNode(graph, newFromId) || !getNode(graph, newToId)) continue; - const migratedEdge = createEdge({ - fromId: newFromId, - toId: newToId, - relation: edge.relation, - strength: edge.strength, - edgeType: edge.edgeType, - }); - migratedEdge.validAt = edge.validAt ?? migratedEdge.validAt; - migratedEdge.invalidAt = edge.invalidAt ?? migratedEdge.invalidAt; - migratedEdge.expiredAt = edge.expiredAt ?? migratedEdge.expiredAt; + const migratedEdge = createEdge({ + fromId: newFromId, + toId: newToId, + relation: edge.relation, + strength: edge.strength, + edgeType: edge.edgeType, + }); + migratedEdge.validAt = edge.validAt ?? migratedEdge.validAt; + migratedEdge.invalidAt = edge.invalidAt ?? migratedEdge.invalidAt; + migratedEdge.expiredAt = edge.expiredAt ?? migratedEdge.expiredAt; - addEdge(graph, migratedEdge); - } + addEdge(graph, migratedEdge); + } } /** * 调用 LLM 总结一批节点 */ -async function summarizeBatch(nodes, typeDef, customPrompt, signal, settings = {}) { - const nodeDescriptions = nodes.map((n, i) => { - const fieldsStr = Object.entries(n.fields) - .filter(([_, v]) => v) - .map(([k, v]) => `${k}: ${v}`) - .join('\n '); - return `节点 ${i + 1} [楼层 ${n.seq}]:\n ${fieldsStr}`; - }).join('\n\n'); +async function summarizeBatch( + nodes, + typeDef, + customPrompt, + signal, + settings = {}, +) { + const nodeDescriptions = nodes + .map((n, i) => { + const fieldsStr = Object.entries(n.fields) + .filter(([_, v]) => v) + .map(([k, v]) => `${k}: ${v}`) + .join("\n "); + return `节点 ${i + 1} [楼层 ${n.seq}]:\n ${fieldsStr}`; + }) + .join("\n\n"); - const instruction = typeDef.compression.instruction || '将以下节点压缩总结为一条精炼记录。'; + const instruction = + typeDef.compression.instruction || "将以下节点压缩总结为一条精炼记录。"; - const compressPromptBuild = buildTaskPrompt(settings, 'compress', { - taskName: 'compress', - nodeContent: nodeDescriptions, - candidateNodes: nodeDescriptions, - currentRange: `${nodes[0]?.seq ?? '?'} ~ ${nodes[nodes.length - 1]?.seq ?? '?'}`, - graphStats: `node_count=${nodes.length}, node_type=${typeDef.id}`, - }); - const systemPrompt = applyTaskRegex( - settings, - 'compress', - 'finalPrompt', - compressPromptBuild.systemPrompt || customPrompt || [ - '你是一个记忆压缩器。将多个同类型节点总结为一条更高层级的压缩节点。', + const compressPromptBuild = buildTaskPrompt(settings, "compress", { + taskName: "compress", + nodeContent: nodeDescriptions, + candidateNodes: nodeDescriptions, + currentRange: `${nodes[0]?.seq ?? "?"} ~ ${nodes[nodes.length - 1]?.seq ?? "?"}`, + graphStats: `node_count=${nodes.length}, node_type=${typeDef.id}`, + }); + const systemPrompt = applyTaskRegex( + settings, + "compress", + "finalPrompt", + compressPromptBuild.systemPrompt || + customPrompt || + [ + "你是一个记忆压缩器。将多个同类型节点总结为一条更高层级的压缩节点。", instruction, - '', - '输出格式为严格 JSON:', - `{"fields": {${typeDef.columns.map(c => `"${c.name}": "..."`).join(', ')}}}`, - '', - '规则:', - '- 保留关键信息:因果关系、不可逆结果、未解决伏笔', - '- 去除重复和低信息密度内容', - '- 压缩后文本应精炼,目标 150 字左右', - ].join('\n'), - ); + "", + "输出格式为严格 JSON:", + `{"fields": {${typeDef.columns.map((c) => `"${c.name}": "..."`).join(", ")}}}`, + "", + "规则:", + "- 保留关键信息:因果关系、不可逆结果、未解决伏笔", + "- 去除重复和低信息密度内容", + "- 压缩后文本应精炼,目标 150 字左右", + ].join("\n"), + ); - const userPrompt = `请压缩以下 ${nodes.length} 个 "${typeDef.label}" 节点:\n\n${nodeDescriptions}`; + const userPrompt = `请压缩以下 ${nodes.length} 个 "${typeDef.label}" 节点:\n\n${nodeDescriptions}`; - return await callLLMForJSON({ - systemPrompt, - userPrompt, - maxRetries: 1, - signal, - taskType: 'compress', - additionalMessages: compressPromptBuild.customMessages || [], - }); + return await callLLMForJSON({ + systemPrompt, + userPrompt, + maxRetries: 1, + signal, + taskType: "compress", + additionalMessages: compressPromptBuild.customMessages || [], + }); } /** @@ -228,20 +276,36 @@ async function summarizeBatch(nodes, typeDef, customPrompt, signal, settings = { * @param {boolean} [force=false] * @returns {Promise<{created: number, archived: number}>} */ -export async function compressAll(graph, schema, embeddingConfig, force = false, customPrompt, signal, settings = {}) { - let totalCreated = 0; - let totalArchived = 0; +export async function compressAll( + graph, + schema, + embeddingConfig, + force = false, + customPrompt, + signal, + settings = {}, +) { + let totalCreated = 0; + let totalArchived = 0; - for (const typeDef of schema) { - throwIfAborted(signal); - if (typeDef.compression?.mode === 'hierarchical') { - const result = await compressType({ graph, typeDef, embeddingConfig, force, customPrompt, signal, settings }); - totalCreated += result.created; - totalArchived += result.archived; - } + for (const typeDef of schema) { + throwIfAborted(signal); + if (typeDef.compression?.mode === "hierarchical") { + const result = await compressType({ + graph, + typeDef, + embeddingConfig, + force, + customPrompt, + signal, + settings, + }); + totalCreated += result.created; + totalArchived += result.archived; } + } - return { created: totalCreated, archived: totalArchived }; + return { created: totalCreated, archived: totalArchived }; } // ==================== v2: 主动遗忘(SleepGate 启发) ==================== @@ -249,40 +313,45 @@ export async function compressAll(graph, schema, embeddingConfig, force = false, /** * 睡眠清理周期 * 评估每个节点的保留价值,低于阈值的归档(遗忘) - * + * * @param {object} graph - 图状态 * @param {object} settings - 包含 forgetThreshold 的设置 * @returns {{forgotten: number}} 本次遗忘的节点数 */ export function sleepCycle(graph, settings) { - const threshold = settings.forgetThreshold ?? 0.5; - const nodes = getActiveNodes(graph); - const now = Date.now(); - let forgotten = 0; + const threshold = settings.forgetThreshold ?? 0.5; + const nodes = getActiveNodes(graph); + const now = Date.now(); + let forgotten = 0; - for (const node of nodes) { - // 跳过常驻类型(synopsis, rule 等重要节点不应被遗忘) - if (node.type === 'synopsis' || node.type === 'rule' || node.type === 'thread') continue; - // 跳过高重要性节点 - if (node.importance >= 8) continue; - // 跳过最近创建的节点(< 1 小时) - if (now - node.createdTime < 3600000) continue; + for (const node of nodes) { + // 跳过常驻类型(synopsis, rule 等重要节点不应被遗忘) + if ( + node.type === "synopsis" || + node.type === "rule" || + node.type === "thread" + ) + continue; + // 跳过高重要性节点 + if (node.importance >= 8) continue; + // 跳过最近创建的节点(< 1 小时) + if (now - node.createdTime < 3600000) continue; - // 计算保留价值 = importance × recency × (1 + accessFreq) - const ageHours = (now - node.createdTime) / 3600000; - const recency = 1 / (1 + Math.log10(1 + ageHours)); - const accessFreq = node.accessCount / Math.max(1, ageHours / 24); - const retentionValue = (node.importance / 10) * recency * (1 + accessFreq); + // 计算保留价值 = importance × recency × (1 + accessFreq) + const ageHours = (now - node.createdTime) / 3600000; + const recency = 1 / (1 + Math.log10(1 + ageHours)); + const accessFreq = node.accessCount / Math.max(1, ageHours / 24); + const retentionValue = (node.importance / 10) * recency * (1 + accessFreq); - if (retentionValue < threshold) { - node.archived = true; - forgotten++; - } + if (retentionValue < threshold) { + node.archived = true; + forgotten++; } + } - if (forgotten > 0) { - console.log(`[ST-BME] 主动遗忘: ${forgotten} 个低价值节点已归档`); - } + if (forgotten > 0) { + console.log(`[ST-BME] 主动遗忘: ${forgotten} 个低价值节点已归档`); + } - return { forgotten }; + return { forgotten }; } diff --git a/consolidator.js b/consolidator.js index 0b48dab..7d4156d 100644 --- a/consolidator.js +++ b/consolidator.js @@ -2,32 +2,32 @@ // 合并 Mem0 精确对照 + A-MEM 记忆进化为单一阶段 // 批量 embed + 批量查近邻 + 单次 LLM 调用 -import { embedBatch, searchSimilar } from './embedding.js'; -import { addEdge, createEdge, getActiveNodes, getNode } from './graph.js'; -import { callLLMForJSON } from './llm.js'; -import { buildTaskPrompt } from './prompt-builder.js'; -import { applyTaskRegex } from './task-regex.js'; +import { embedBatch, searchSimilar } from "./embedding.js"; +import { addEdge, createEdge, getActiveNodes, getNode } from "./graph.js"; +import { callLLMForJSON } from "./llm.js"; +import { buildTaskPrompt } from "./prompt-builder.js"; +import { applyTaskRegex } from "./task-regex.js"; import { - buildNodeVectorText, - findSimilarNodesByText, - isDirectVectorConfig, - validateVectorConfig, -} from './vector-index.js'; + buildNodeVectorText, + findSimilarNodesByText, + isDirectVectorConfig, + validateVectorConfig, +} from "./vector-index.js"; -function createAbortError(message = '操作已终止') { - const error = new Error(message); - error.name = 'AbortError'; - return error; +function createAbortError(message = "操作已终止") { + const error = new Error(message); + error.name = "AbortError"; + return error; } function isAbortError(error) { - return error?.name === 'AbortError'; + return error?.name === "AbortError"; } function throwIfAborted(signal) { - if (signal?.aborted) { - throw signal.reason instanceof Error ? signal.reason : createAbortError(); - } + if (signal?.aborted) { + throw signal.reason instanceof Error ? signal.reason : createAbortError(); + } } /** @@ -102,368 +102,408 @@ const CONSOLIDATION_SYSTEM_PROMPT = `你是一个记忆整合分析器。当新 * @returns {Promise<{merged: number, skipped: number, kept: number, evolved: number, connections: number, updates: number}>} */ export async function consolidateMemories({ - graph, - newNodeIds, - embeddingConfig, - options = {}, - customPrompt, - signal, - settings = {}, + graph, + newNodeIds, + embeddingConfig, + options = {}, + customPrompt, + signal, + settings = {}, }) { - const neighborCount = options.neighborCount ?? 5; - const conflictThreshold = options.conflictThreshold ?? 0.85; - const stats = { - merged: 0, - skipped: 0, - kept: 0, - evolved: 0, - connections: 0, - updates: 0, - }; + const neighborCount = options.neighborCount ?? 5; + const conflictThreshold = options.conflictThreshold ?? 0.85; + const stats = { + merged: 0, + skipped: 0, + kept: 0, + evolved: 0, + connections: 0, + updates: 0, + }; - if (!newNodeIds || newNodeIds.length === 0) return stats; - if (!validateVectorConfig(embeddingConfig).valid) { - console.log('[ST-BME] 记忆整合跳过:向量配置不可用'); - return stats; + if (!newNodeIds || newNodeIds.length === 0) return stats; + if (!validateVectorConfig(embeddingConfig).valid) { + console.log("[ST-BME] 记忆整合跳过:向量配置不可用"); + return stats; + } + + // ══════════════════════════════════════════════ + // Phase 0: 收集有效新节点 + // ══════════════════════════════════════════════ + const newEntries = []; + for (const id of newNodeIds) { + const node = getNode(graph, id); + if (!node || node.archived) continue; + const text = buildNodeVectorText(node); + if (!text) continue; + newEntries.push({ id, node, text }); + } + + if (newEntries.length === 0) return stats; + + const activeNodes = getActiveNodes(graph).filter((n) => { + const text = buildNodeVectorText(n); + return typeof text === "string" && text.length > 0; + }); + + if (activeNodes.length < 2) { + // 图中节点不够,全部 keep + stats.kept = newEntries.length; + return stats; + } + + throwIfAborted(signal); + console.log(`[ST-BME] 记忆整合开始: ${newEntries.length} 个新节点`); + + // ══════════════════════════════════════════════ + // Phase 1 + 2: 批量 Embed + 查近邻 + // ══════════════════════════════════════════════ + /** @type {Map>} */ + const neighborsMap = new Map(); + + if (isDirectVectorConfig(embeddingConfig)) { + // ── 直连模式: 1 次 embedBatch + N 次本地 cosine ── + const texts = newEntries.map((e) => e.text); + let queryVectors; + + try { + queryVectors = await embedBatch(texts, embeddingConfig, { signal }); + } catch (e) { + if (isAbortError(e)) throw e; + console.warn("[ST-BME] 批量 embed 失败,回退到逐条:", e.message); + queryVectors = null; } - // ══════════════════════════════════════════════ - // Phase 0: 收集有效新节点 - // ══════════════════════════════════════════════ - const newEntries = []; - for (const id of newNodeIds) { - const node = getNode(graph, id); - if (!node || node.archived) continue; - const text = buildNodeVectorText(node); - if (!text) continue; - newEntries.push({ id, node, text }); - } - - if (newEntries.length === 0) return stats; - - const activeNodes = getActiveNodes(graph).filter(n => { - const text = buildNodeVectorText(n); - return typeof text === 'string' && text.length > 0; - }); - - if (activeNodes.length < 2) { - // 图中节点不够,全部 keep - stats.kept = newEntries.length; - return stats; - } - - throwIfAborted(signal); - console.log(`[ST-BME] 记忆整合开始: ${newEntries.length} 个新节点`); - - // ══════════════════════════════════════════════ - // Phase 1 + 2: 批量 Embed + 查近邻 - // ══════════════════════════════════════════════ - /** @type {Map>} */ - const neighborsMap = new Map(); - - if (isDirectVectorConfig(embeddingConfig)) { - // ── 直连模式: 1 次 embedBatch + N 次本地 cosine ── - const texts = newEntries.map(e => e.text); - let queryVectors; - - try { - queryVectors = await embedBatch(texts, embeddingConfig, { signal }); - } catch (e) { - if (isAbortError(e)) throw e; - console.warn('[ST-BME] 批量 embed 失败,回退到逐条:', e.message); - queryVectors = null; - } - - // 构建候选池(含 embedding 的活跃节点) - const candidatePool = activeNodes - .filter(n => Array.isArray(n.embedding) && n.embedding.length > 0) - .map(n => ({ nodeId: n.id, embedding: n.embedding })); - - for (let i = 0; i < newEntries.length; i++) { - throwIfAborted(signal); - const entry = newEntries[i]; - const candidates = candidatePool.filter(c => c.nodeId !== entry.id); - - if (queryVectors?.[i] && candidates.length > 0) { - // 本地 cosine 搜索(0 API 调用) - const neighbors = searchSimilar(queryVectors[i], candidates, neighborCount); - neighborsMap.set(entry.id, neighbors); - } else { - // fallback: 逐条 embed - try { - const neighbors = await findSimilarNodesByText( - graph, entry.text, embeddingConfig, neighborCount, - activeNodes.filter(n => n.id !== entry.id), signal, - ); - neighborsMap.set(entry.id, neighbors); - } catch (e) { - if (isAbortError(e)) throw e; - console.warn(`[ST-BME] 近邻查询失败 (${entry.id}):`, e.message); - neighborsMap.set(entry.id, []); - } - } - } - } else { - // ── 后端模式: 逐条 /api/vector/query ── - for (let i = 0; i < newEntries.length; i++) { - throwIfAborted(signal); - const entry = newEntries[i]; - try { - const neighbors = await findSimilarNodesByText( - graph, entry.text, embeddingConfig, neighborCount, - activeNodes.filter(n => n.id !== entry.id), signal, - ); - neighborsMap.set(entry.id, neighbors); - } catch (e) { - if (isAbortError(e)) throw e; - console.warn(`[ST-BME] 近邻查询失败 (${entry.id}):`, e.message); - neighborsMap.set(entry.id, []); - } - } - } - - // ══════════════════════════════════════════════ - // Phase 3: 单次 LLM 批量判定 - // ══════════════════════════════════════════════ - throwIfAborted(signal); - - const userPromptSections = []; - userPromptSections.push(`本轮共新增 ${newEntries.length} 条记忆,请逐条分析:\n`); + // 构建候选池(含 embedding 的活跃节点) + const candidatePool = activeNodes + .filter((n) => Array.isArray(n.embedding) && n.embedding.length > 0) + .map((n) => ({ nodeId: n.id, embedding: n.embedding })); for (let i = 0; i < newEntries.length; i++) { - const entry = newEntries[i]; - const neighbors = neighborsMap.get(entry.id) || []; + throwIfAborted(signal); + const entry = newEntries[i]; + const candidates = candidatePool.filter((c) => c.nodeId !== entry.id); - const newNodeFieldsStr = Object.entries(entry.node.fields) - .map(([k, v]) => `${k}: ${v}`) - .join(', '); - - // 构建近邻描述 - let neighborText; - if (neighbors.length === 0) { - neighborText = ' (无近邻命中)'; - } else { - neighborText = neighbors.map(n => { - const node = getNode(graph, n.nodeId); - if (!node) return null; - const fieldsStr = Object.entries(node.fields) - .map(([k, v]) => `${k}: ${v}`) - .join(', '); - return ` - [${node.id}] 类型=${node.type}, ${fieldsStr} (相似度=${n.score.toFixed(3)})`; - }).filter(Boolean).join('\n'); - } - - // 检查高相似度 - const hasHighSimilarity = neighbors.length > 0 && neighbors[0].score > conflictThreshold; - const hint = hasHighSimilarity - ? ` ⚠ 最高相似度 ${neighbors[0].score.toFixed(3)} 超过阈值 ${conflictThreshold}` - : ''; - - userPromptSections.push([ - `### 新记忆 #${i + 1}`, - `[${entry.id}] 类型=${entry.node.type}, ${newNodeFieldsStr}`, - '近邻记忆:', - neighborText, - hint, - ].filter(Boolean).join('\n')); - } - - const userPrompt = userPromptSections.join('\n\n'); - - let decision; - const consolidationPromptBuild = buildTaskPrompt(settings, 'consolidation', { - taskName: 'consolidation', - candidateNodes: userPrompt, - candidateText: userPrompt, - graphStats: `new_entries=${newEntries.length}, threshold=${conflictThreshold}`, - }); - try { - decision = await callLLMForJSON({ - systemPrompt: applyTaskRegex( - settings, - 'consolidation', - 'finalPrompt', - consolidationPromptBuild.systemPrompt || customPrompt || CONSOLIDATION_SYSTEM_PROMPT, - ), - userPrompt, - maxRetries: 1, + if (queryVectors?.[i] && candidates.length > 0) { + // 本地 cosine 搜索(0 API 调用) + const neighbors = searchSimilar( + queryVectors[i], + candidates, + neighborCount, + ); + neighborsMap.set(entry.id, neighbors); + } else { + // fallback: 逐条 embed + try { + const neighbors = await findSimilarNodesByText( + graph, + entry.text, + embeddingConfig, + neighborCount, + activeNodes.filter((n) => n.id !== entry.id), signal, - taskType: 'consolidation', - additionalMessages: consolidationPromptBuild.customMessages || [], - }); - } catch (e) { - if (isAbortError(e)) throw e; - console.error('[ST-BME] 记忆整合 LLM 调用失败:', e); - stats.kept = newEntries.length; - return stats; - } - - // ══════════════════════════════════════════════ - // Phase 4: 逐个处理结果 - // ══════════════════════════════════════════════ - - // 解析 LLM 返回——兼容单条和批量格式 - let results; - if (Array.isArray(decision?.results)) { - results = decision.results; - } else if (decision?.action) { - // 单条返回格式(LLM 可能忽略 results 包装) - results = [{ ...decision, node_id: newEntries[0]?.id }]; - } else { - console.warn('[ST-BME] 记忆整合: LLM 返回格式异常,全部 keep'); - stats.kept = newEntries.length; - return stats; - } - - // 建立 node_id → result 的映射 - const resultMap = new Map(); - for (const r of results) { - if (r.node_id) resultMap.set(r.node_id, r); - } - - // 处理每个新节点 - for (const entry of newEntries) { - const result = resultMap.get(entry.id); - if (!result) { - // LLM 未返回此节点的结果,fallback 为 keep - stats.kept++; - continue; + ); + neighborsMap.set(entry.id, neighbors); + } catch (e) { + if (isAbortError(e)) throw e; + console.warn(`[ST-BME] 近邻查询失败 (${entry.id}):`, e.message); + neighborsMap.set(entry.id, []); } + } + } + } else { + // ── 后端模式: 逐条 /api/vector/query ── + for (let i = 0; i < newEntries.length; i++) { + throwIfAborted(signal); + const entry = newEntries[i]; + try { + const neighbors = await findSimilarNodesByText( + graph, + entry.text, + embeddingConfig, + neighborCount, + activeNodes.filter((n) => n.id !== entry.id), + signal, + ); + neighborsMap.set(entry.id, neighbors); + } catch (e) { + if (isAbortError(e)) throw e; + console.warn(`[ST-BME] 近邻查询失败 (${entry.id}):`, e.message); + neighborsMap.set(entry.id, []); + } + } + } - processOneResult(graph, entry, result, stats); + // ══════════════════════════════════════════════ + // Phase 3: 单次 LLM 批量判定 + // ══════════════════════════════════════════════ + throwIfAborted(signal); + + const userPromptSections = []; + userPromptSections.push( + `本轮共新增 ${newEntries.length} 条记忆,请逐条分析:\n`, + ); + + for (let i = 0; i < newEntries.length; i++) { + const entry = newEntries[i]; + const neighbors = neighborsMap.get(entry.id) || []; + + const newNodeFieldsStr = Object.entries(entry.node.fields) + .map(([k, v]) => `${k}: ${v}`) + .join(", "); + + // 构建近邻描述 + let neighborText; + if (neighbors.length === 0) { + neighborText = " (无近邻命中)"; + } else { + neighborText = neighbors + .map((n) => { + const node = getNode(graph, n.nodeId); + if (!node) return null; + const fieldsStr = Object.entries(node.fields) + .map(([k, v]) => `${k}: ${v}`) + .join(", "); + return ` - [${node.id}] 类型=${node.type}, ${fieldsStr} (相似度=${n.score.toFixed(3)})`; + }) + .filter(Boolean) + .join("\n"); } - // 日志 - const actionSummary = []; - if (stats.merged > 0) actionSummary.push(`合并 ${stats.merged}`); - if (stats.skipped > 0) actionSummary.push(`跳过 ${stats.skipped}`); - if (stats.kept > 0) actionSummary.push(`保留 ${stats.kept}`); - if (stats.evolved > 0) actionSummary.push(`进化 ${stats.evolved}`); - if (stats.connections > 0) actionSummary.push(`新链接 ${stats.connections}`); - if (stats.updates > 0) actionSummary.push(`回溯更新 ${stats.updates}`); + // 检查高相似度 + const hasHighSimilarity = + neighbors.length > 0 && neighbors[0].score > conflictThreshold; + const hint = hasHighSimilarity + ? ` ⚠ 最高相似度 ${neighbors[0].score.toFixed(3)} 超过阈值 ${conflictThreshold}` + : ""; - if (actionSummary.length > 0) { - console.log(`[ST-BME] 记忆整合完成: ${actionSummary.join(', ')}`); - } + userPromptSections.push( + [ + `### 新记忆 #${i + 1}`, + `[${entry.id}] 类型=${entry.node.type}, ${newNodeFieldsStr}`, + "近邻记忆:", + neighborText, + hint, + ] + .filter(Boolean) + .join("\n"), + ); + } + const userPrompt = userPromptSections.join("\n\n"); + + let decision; + const consolidationPromptBuild = buildTaskPrompt(settings, "consolidation", { + taskName: "consolidation", + candidateNodes: userPrompt, + candidateText: userPrompt, + graphStats: `new_entries=${newEntries.length}, threshold=${conflictThreshold}`, + }); + try { + decision = await callLLMForJSON({ + systemPrompt: applyTaskRegex( + settings, + "consolidation", + "finalPrompt", + consolidationPromptBuild.systemPrompt || + customPrompt || + CONSOLIDATION_SYSTEM_PROMPT, + ), + userPrompt, + maxRetries: 1, + signal, + taskType: "consolidation", + additionalMessages: consolidationPromptBuild.customMessages || [], + }); + } catch (e) { + if (isAbortError(e)) throw e; + console.error("[ST-BME] 记忆整合 LLM 调用失败:", e); + stats.kept = newEntries.length; return stats; + } + + // ══════════════════════════════════════════════ + // Phase 4: 逐个处理结果 + // ══════════════════════════════════════════════ + + // 解析 LLM 返回——兼容单条和批量格式 + let results; + if (Array.isArray(decision?.results)) { + results = decision.results; + } else if (decision?.action) { + // 单条返回格式(LLM 可能忽略 results 包装) + results = [{ ...decision, node_id: newEntries[0]?.id }]; + } else { + console.warn("[ST-BME] 记忆整合: LLM 返回格式异常,全部 keep"); + stats.kept = newEntries.length; + return stats; + } + + // 建立 node_id → result 的映射 + const resultMap = new Map(); + for (const r of results) { + if (r.node_id) resultMap.set(r.node_id, r); + } + + // 处理每个新节点 + for (const entry of newEntries) { + const result = resultMap.get(entry.id); + if (!result) { + // LLM 未返回此节点的结果,fallback 为 keep + stats.kept++; + continue; + } + + processOneResult(graph, entry, result, stats); + } + + // 日志 + const actionSummary = []; + if (stats.merged > 0) actionSummary.push(`合并 ${stats.merged}`); + if (stats.skipped > 0) actionSummary.push(`跳过 ${stats.skipped}`); + if (stats.kept > 0) actionSummary.push(`保留 ${stats.kept}`); + if (stats.evolved > 0) actionSummary.push(`进化 ${stats.evolved}`); + if (stats.connections > 0) actionSummary.push(`新链接 ${stats.connections}`); + if (stats.updates > 0) actionSummary.push(`回溯更新 ${stats.updates}`); + + if (actionSummary.length > 0) { + console.log(`[ST-BME] 记忆整合完成: ${actionSummary.join(", ")}`); + } + + return stats; } /** * 处理单个节点的整合结果 */ function processOneResult(graph, entry, result, stats) { - const { id: newId, node: newNode } = entry; + const { id: newId, node: newNode } = entry; - // ── 处理 action ── - switch (result.action) { - case 'skip': { - console.log(`[ST-BME] 记忆整合: skip (重复) — ${newId}`); - newNode.archived = true; - stats.skipped++; - break; - } - - case 'merge': { - const targetId = result.merge_target_id; - const targetNode = targetId ? getNode(graph, targetId) : null; - - if (targetNode && !targetNode.archived) { - console.log(`[ST-BME] 记忆整合: merge ${newId} → ${targetId}`); - - if (result.merged_fields && typeof result.merged_fields === 'object') { - for (const [key, value] of Object.entries(result.merged_fields)) { - if (value != null && value !== '') { - targetNode.fields[key] = value; - } - } - } else { - for (const [key, value] of Object.entries(newNode.fields)) { - if (value != null && value !== '' && !targetNode.fields[key]) { - targetNode.fields[key] = value; - } - } - } - - if (Number.isFinite(newNode.seq) && newNode.seq > (targetNode.seq || 0)) { - targetNode.seq = newNode.seq; - } - - targetNode.embedding = null; - newNode.archived = true; - stats.merged++; - } else { - console.warn(`[ST-BME] 记忆整合: merge target ${targetId} 不存在,回退为 keep`); - stats.kept++; - } - break; - } - - case 'keep': - default: { - stats.kept++; - break; - } + // ── 处理 action ── + switch (result.action) { + case "skip": { + console.log(`[ST-BME] 记忆整合: skip (重复) — ${newId}`); + newNode.archived = true; + stats.skipped++; + break; } - // ── 处理 evolution ── - const evolution = result.evolution; - if (evolution?.should_evolve && !newNode.archived) { - stats.evolved++; - console.log(`[ST-BME] 记忆整合/进化触发: ${result.reason || '(无理由)'}`); + case "merge": { + const targetId = result.merge_target_id; + const targetNode = targetId ? getNode(graph, targetId) : null; - if (Array.isArray(evolution.connections)) { - for (const targetId of evolution.connections) { - if (!getNode(graph, targetId)) continue; - const edge = createEdge({ - fromId: newId, - toId: targetId, - relation: 'related', - strength: 0.7, - }); - if (addEdge(graph, edge)) { - stats.connections++; - } + if (targetNode && !targetNode.archived) { + console.log(`[ST-BME] 记忆整合: merge ${newId} → ${targetId}`); + + if (result.merged_fields && typeof result.merged_fields === "object") { + for (const [key, value] of Object.entries(result.merged_fields)) { + if (value != null && value !== "") { + targetNode.fields[key] = value; } + } + } else { + for (const [key, value] of Object.entries(newNode.fields)) { + if (value != null && value !== "" && !targetNode.fields[key]) { + targetNode.fields[key] = value; + } + } } - if (Array.isArray(evolution.neighbor_updates)) { - for (const update of evolution.neighbor_updates) { - if (!update.nodeId) continue; - const oldNode = getNode(graph, update.nodeId); - if (!oldNode || oldNode.archived) continue; - - let changed = false; - - if (update.newContext && typeof update.newContext === 'string') { - if (oldNode.fields.state !== undefined) { - oldNode.fields.state = update.newContext; - changed = true; - } else if (oldNode.fields.summary !== undefined) { - oldNode.fields.summary = update.newContext; - changed = true; - } else if (oldNode.fields.core_note !== undefined) { - oldNode.fields.core_note = update.newContext; - changed = true; - } - } - - if (update.newTags && Array.isArray(update.newTags)) { - oldNode.clusters = update.newTags; - changed = true; - } - - if (changed) { - oldNode.embedding = null; - if (!oldNode._evolutionHistory) oldNode._evolutionHistory = []; - oldNode._evolutionHistory.push({ - triggeredBy: newId, - timestamp: Date.now(), - reason: result.reason || '', - }); - stats.updates++; - } - } + if ( + Number.isFinite(newNode.seq) && + newNode.seq > (targetNode.seq || 0) + ) { + targetNode.seq = newNode.seq; } + + const targetRange = Array.isArray(targetNode.seqRange) + ? targetNode.seqRange + : [targetNode.seq || 0, targetNode.seq || 0]; + const newRange = Array.isArray(newNode.seqRange) + ? newNode.seqRange + : [newNode.seq || 0, newNode.seq || 0]; + targetNode.seqRange = [ + Math.min(targetRange[0], newRange[0]), + Math.max(targetRange[1], newRange[1]), + ]; + + targetNode.embedding = null; + newNode.archived = true; + stats.merged++; + } else { + console.warn( + `[ST-BME] 记忆整合: merge target ${targetId} 不存在,回退为 keep`, + ); + stats.kept++; + } + break; } + + case "keep": + default: { + stats.kept++; + break; + } + } + + // ── 处理 evolution ── + const evolution = result.evolution; + if (evolution?.should_evolve && !newNode.archived) { + stats.evolved++; + console.log(`[ST-BME] 记忆整合/进化触发: ${result.reason || "(无理由)"}`); + + if (Array.isArray(evolution.connections)) { + for (const targetId of evolution.connections) { + if (!getNode(graph, targetId)) continue; + const edge = createEdge({ + fromId: newId, + toId: targetId, + relation: "related", + strength: 0.7, + }); + if (addEdge(graph, edge)) { + stats.connections++; + } + } + } + + if (Array.isArray(evolution.neighbor_updates)) { + for (const update of evolution.neighbor_updates) { + if (!update.nodeId) continue; + const oldNode = getNode(graph, update.nodeId); + if (!oldNode || oldNode.archived) continue; + + let changed = false; + + if (update.newContext && typeof update.newContext === "string") { + if (oldNode.fields.state !== undefined) { + oldNode.fields.state = update.newContext; + changed = true; + } else if (oldNode.fields.summary !== undefined) { + oldNode.fields.summary = update.newContext; + changed = true; + } else if (oldNode.fields.core_note !== undefined) { + oldNode.fields.core_note = update.newContext; + changed = true; + } + } + + if (update.newTags && Array.isArray(update.newTags)) { + oldNode.clusters = update.newTags; + changed = true; + } + + if (changed) { + oldNode.embedding = null; + if (!oldNode._evolutionHistory) oldNode._evolutionHistory = []; + oldNode._evolutionHistory.push({ + triggeredBy: newId, + timestamp: Date.now(), + reason: result.reason || "", + }); + stats.updates++; + } + } + } + } } diff --git a/extractor.js b/extractor.js index f4f9d9c..c748d2d 100644 --- a/extractor.js +++ b/extractor.js @@ -15,17 +15,11 @@ import { updateNode, } from "./graph.js"; import { callLLMForJSON } from "./llm.js"; -import { - ensureEventTitle, - getNodeDisplayName, -} from "./node-labels.js"; +import { ensureEventTitle, getNodeDisplayName } from "./node-labels.js"; import { buildTaskPrompt } from "./prompt-builder.js"; -import { applyTaskRegex } from "./task-regex.js"; import { RELATION_TYPES } from "./schema.js"; -import { - buildNodeVectorText, - isDirectVectorConfig, -} from "./vector-index.js"; +import { applyTaskRegex } from "./task-regex.js"; +import { buildNodeVectorText, isDirectVectorConfig } from "./vector-index.js"; function createAbortError(message = "操作已终止") { const error = new Error(message); @@ -39,9 +33,7 @@ function isAbortError(error) { function throwIfAborted(signal) { if (signal?.aborted) { - throw signal.reason instanceof Error - ? signal.reason - : createAbortError(); + throw signal.reason instanceof Error ? signal.reason : createAbortError(); } } @@ -84,8 +76,6 @@ export async function extractMemories({ }; } - - const effectiveStartSeq = Number.isFinite(startSeq) ? startSeq : (messages.find((m) => Number.isFinite(m.seq))?.seq ?? @@ -134,7 +124,9 @@ export async function extractMemories({ settings, "extract", "finalPrompt", - promptBuild.systemPrompt || extractPrompt || buildDefaultExtractPrompt(schema), + promptBuild.systemPrompt || + extractPrompt || + buildDefaultExtractPrompt(schema), ); // 用户提示词 @@ -175,12 +167,11 @@ export async function extractMemories({ }; } - - // 执行操作 const stats = { newNodes: 0, updatedNodes: 0, newEdges: 0 }; const newNodeIds = []; // v2: 收集新建节点 ID(用于进化引擎) const refMap = new Map(); + const operationErrors = []; for (const op of result.operations) { try { @@ -206,14 +197,29 @@ export async function extractMemories({ case "_skip": // Mem0 对照判定为重复,跳过 break; - default: - console.warn(`[ST-BME] 未知操作类型: ${op.action}`); + default: { + const message = `[ST-BME] 未知操作类型: ${op?.action ?? ""}`; + console.warn(message, op); + operationErrors.push(message); + break; + } } } catch (e) { console.error(`[ST-BME] 操作执行失败:`, op, e); + operationErrors.push(e?.message || String(e)); } } + if (operationErrors.length > 0) { + return { + success: false, + error: operationErrors.join(" | "), + ...stats, + newNodeIds, + processedRange: [effectiveStartSeq, effectiveEndSeq], + }; + } + // 为新建节点生成 embedding。失败不应回滚整批图谱写入。 try { await generateNodeEmbeddings(graph, embeddingConfig, signal); @@ -248,7 +254,7 @@ export async function extractMemories({ */ function handleCreate(graph, op, seq, schema, refMap, stats) { const normalizedFields = - op.type === "event" ? ensureEventTitle(op.fields || {}) : (op.fields || {}); + op.type === "event" ? ensureEventTitle(op.fields || {}) : op.fields || {}; const typeDef = schema.find((s) => s.id === op.type); if (!typeDef) { console.warn(`[ST-BME] 未知节点类型: ${op.type}`); @@ -480,7 +486,9 @@ async function generateNodeEmbeddings(graph, embeddingConfig, signal) { if (needsEmbedding.length === 0) return; - const texts = needsEmbedding.map((node) => buildNodeVectorText(node) || node.type); + const texts = needsEmbedding.map( + (node) => buildNodeVectorText(node) || node.type, + ); console.log(`[ST-BME] 为 ${texts.length} 个节点生成 embedding`); @@ -591,7 +599,14 @@ function buildDefaultExtractPrompt(schema) { * @param {number} params.currentSeq * @returns {Promise} */ -export async function generateSynopsis({ graph, schema, currentSeq, customPrompt, signal, settings = {} }) { +export async function generateSynopsis({ + graph, + schema, + currentSeq, + customPrompt, + signal, + settings = {}, +}) { const eventNodes = getActiveNodes(graph, "event").sort( (a, b) => a.seq - b.seq, ); @@ -623,11 +638,13 @@ export async function generateSynopsis({ graph, schema, currentSeq, customPrompt settings, "synopsis", "finalPrompt", - synopsisPromptBuild.systemPrompt || customPrompt || [ - "你是故事概要生成器。根据事件线、角色和主线生成简洁的前情提要。", - '输出 JSON:{"summary": "前情提要文本(200字以内)"}', - "要求:涵盖核心冲突、关键转折、主要角色当前状态。", - ].join("\n"), + synopsisPromptBuild.systemPrompt || + customPrompt || + [ + "你是故事概要生成器。根据事件线、角色和主线生成简洁的前情提要。", + '输出 JSON:{"summary": "前情提要文本(200字以内)"}', + "要求:涵盖核心冲突、关键转折、主要角色当前状态。", + ].join("\n"), ); const result = await callLLMForJSON({ @@ -677,7 +694,13 @@ export async function generateSynopsis({ graph, schema, currentSeq, customPrompt } } -export async function generateReflection({ graph, currentSeq, customPrompt, signal, settings = {} }) { +export async function generateReflection({ + graph, + currentSeq, + customPrompt, + signal, + settings = {}, +}) { const recentEvents = getActiveNodes(graph, "event") .sort((a, b) => b.seq - a.seq) .slice(0, 6) @@ -728,14 +751,16 @@ export async function generateReflection({ graph, currentSeq, customPrompt, sign settings, "reflection", "finalPrompt", - reflectionPromptBuild.systemPrompt || customPrompt || [ - "你是 RP 长期记忆系统的反思生成器。", - '输出严格 JSON:{"insight":"...","trigger":"...","suggestion":"...","importance":1-10}', - "insight 应总结最近情节中最值得长期保留的变化、关系趋势或潜在线索。", - "trigger 说明触发这条反思的关键事件或矛盾。", - "suggestion 给出后续检索或叙事上值得关注的提示。", - "不要复述全部事件,要提炼高层结论。", - ].join("\n"), + reflectionPromptBuild.systemPrompt || + customPrompt || + [ + "你是 RP 长期记忆系统的反思生成器。", + '输出严格 JSON:{"insight":"...","trigger":"...","suggestion":"...","importance":1-10}', + "insight 应总结最近情节中最值得长期保留的变化、关系趋势或潜在线索。", + "trigger 说明触发这条反思的关键事件或矛盾。", + "suggestion 给出后续检索或叙事上值得关注的提示。", + "不要复述全部事件,要提炼高层结论。", + ].join("\n"), ); const result = await callLLMForJSON({ diff --git a/index.js b/index.js index e4d4103..0d7da09 100644 --- a/index.js +++ b/index.js @@ -33,6 +33,10 @@ import { estimateTokens, formatInjection } from "./injector.js"; import { fetchMemoryLLMModels, testLLMConnection } from "./llm.js"; import { getNodeDisplayName } from "./node-labels.js"; import { showManagedBmeNotice } from "./notice.js"; +import { + createDefaultTaskProfiles, + migrateLegacyTaskProfiles, +} from "./prompt-profiles.js"; import { retrieve } from "./retriever.js"; import { appendBatchJournal, @@ -59,7 +63,6 @@ import { testVectorConnection, validateVectorConfig, } from "./vector-index.js"; -import { createDefaultTaskProfiles, migrateLegacyTaskProfiles } from "./prompt-profiles.js"; // 操控面板模块(动态加载,防止加载失败崩溃整个扩展) let _panelModule = null; @@ -198,9 +201,8 @@ let sendIntentHookRetryTimer = null; let pendingHistoryRecoveryTimer = null; let pendingHistoryRecoveryTrigger = ""; let pendingHistoryMutationCheckTimers = []; -let skipBeforeCombineRecallUntil = 0; -let lastPreGenerationRecallKey = ""; -let lastPreGenerationRecallAt = 0; +const generationRecallTransactions = new Map(); +const GENERATION_RECALL_TRANSACTION_TTL_MS = 15000; const stageNoticeHandles = { extraction: null, vector: null, @@ -956,6 +958,15 @@ function updateProcessedHistorySnapshot(chat, lastProcessedAssistantFloor) { currentGraph.lastProcessedSeq = lastProcessedAssistantFloor; } +function shouldAdvanceProcessedHistory(batchStatus) { + if (!batchStatus || typeof batchStatus !== "object") return false; + return ( + batchStatus.completed === true && + batchStatus.outcome === "success" && + batchStatus.consistency === "strong" + ); +} + function computePostProcessArtifacts( beforeSnapshot, afterSnapshot, @@ -1648,6 +1659,111 @@ function buildPreGenerationRecallKey(type, options = {}) { ].join(":"); } +function cleanupGenerationRecallTransactions(now = Date.now()) { + for (const [ + transactionId, + transaction, + ] of generationRecallTransactions.entries()) { + if ( + !transaction || + now - (transaction.updatedAt || 0) > GENERATION_RECALL_TRANSACTION_TTL_MS + ) { + generationRecallTransactions.delete(transactionId); + } + } +} + +function buildGenerationRecallTransactionId(chatId, generationType, recallKey) { + return [ + String(chatId || ""), + String(generationType || "normal").trim() || "normal", + String(recallKey || ""), + ].join(":"); +} + +function beginGenerationRecallTransaction({ + chatId, + generationType = "normal", + recallKey = "", +} = {}) { + const normalizedChatId = String(chatId || ""); + const normalizedGenerationType = + String(generationType || "normal").trim() || "normal"; + const normalizedRecallKey = String(recallKey || ""); + if (!normalizedChatId || !normalizedRecallKey) return null; + + cleanupGenerationRecallTransactions(); + const transactionId = buildGenerationRecallTransactionId( + normalizedChatId, + normalizedGenerationType, + normalizedRecallKey, + ); + const now = Date.now(); + const transaction = generationRecallTransactions.get(transactionId) || { + id: transactionId, + chatId: normalizedChatId, + generationType: normalizedGenerationType, + recallKey: normalizedRecallKey, + hookStates: {}, + createdAt: now, + }; + transaction.updatedAt = now; + generationRecallTransactions.set(transactionId, transaction); + return transaction; +} + +function markGenerationRecallTransactionHookState( + transaction, + hookName, + state = "completed", +) { + if (!transaction?.id || !hookName) return transaction; + transaction.hookStates ||= {}; + transaction.hookStates[hookName] = state; + transaction.updatedAt = Date.now(); + generationRecallTransactions.set(transaction.id, transaction); + return transaction; +} + +function shouldRunRecallForTransaction(transaction, hookName) { + if (!hookName) return true; + if (!transaction) return true; + const hookStates = transaction.hookStates || {}; + if (hookStates[hookName] === "completed") { + return false; + } + if ( + hookName === "GENERATE_BEFORE_COMBINE_PROMPTS" && + hookStates.GENERATION_AFTER_COMMANDS === "completed" + ) { + return false; + } + return true; +} + +function createGenerationRecallContext({ + hookName, + generationType = "normal", + recallOptions = {}, + chatId = getCurrentChatId(), +} = {}) { + const recallKey = + recallOptions.recallKey || + buildPreGenerationRecallKey(generationType, recallOptions); + const transaction = beginGenerationRecallTransaction({ + chatId, + generationType, + recallKey, + }); + return { + hookName, + generationType, + recallKey, + transaction, + shouldRun: shouldRunRecallForTransaction(transaction, hookName), + }; +} + function getCurrentChatSeq(context = getContext()) { const chat = context?.chat; if (Array.isArray(chat) && chat.length > 0) { @@ -1656,19 +1772,129 @@ function getCurrentChatSeq(context = getContext()) { return currentGraph?.lastProcessedSeq ?? 0; } +const BATCH_STAGE_ORDER = ["core", "structural", "semantic", "finalize"]; +const BATCH_STAGE_SEVERITY = { + success: 0, + partial: 1, + failed: 2, +}; + +function createBatchStageStatus(stage, consistency = "strong") { + return { + stage, + outcome: "success", + consistency, + warnings: [], + errors: [], + artifacts: [], + }; +} + +function createBatchStatusSkeleton({ processedRange, extractionCountBefore }) { + return { + model: "layered-batch-v1", + processedRange: Array.isArray(processedRange) + ? [...processedRange] + : [-1, -1], + extractionCountBefore: Number.isFinite(extractionCountBefore) + ? extractionCountBefore + : extractionCount, + extractionCountAfter: Number.isFinite(extractionCount) + ? extractionCount + : 0, + stages: { + core: createBatchStageStatus("core", "strong"), + structural: createBatchStageStatus("structural", "weak"), + semantic: createBatchStageStatus("semantic", "weak"), + finalize: createBatchStageStatus("finalize", "strong"), + }, + outcome: "success", + consistency: "strong", + completed: false, + warnings: [], + errors: [], + }; +} + +function setBatchStageOutcome(status, stage, outcome, message = "") { + const stageStatus = status?.stages?.[stage]; + if (!stageStatus) return; + const nextSeverity = BATCH_STAGE_SEVERITY[outcome] ?? 0; + const previousSeverity = BATCH_STAGE_SEVERITY[stageStatus.outcome] ?? 0; + if (nextSeverity >= previousSeverity) { + stageStatus.outcome = outcome; + } + if (!message) return; + if (outcome === "failed") { + stageStatus.errors.push(message); + } else if (outcome === "partial") { + stageStatus.warnings.push(message); + } +} + +function pushBatchStageArtifact(status, stage, artifact) { + const stageStatus = status?.stages?.[stage]; + if (!stageStatus || !artifact) return; + if (!stageStatus.artifacts.includes(artifact)) { + stageStatus.artifacts.push(artifact); + } +} + +function finalizeBatchStatus(status) { + const stages = status?.stages || {}; + const structuralOutcome = stages.structural?.outcome || "success"; + const semanticOutcome = stages.semantic?.outcome || "success"; + const finalizeOutcome = stages.finalize?.outcome || "failed"; + const outcomeList = BATCH_STAGE_ORDER.map( + (stage) => stages[stage]?.outcome || "success", + ); + + if (finalizeOutcome !== "success") { + status.outcome = "failed"; + } else if (outcomeList.includes("failed")) { + status.outcome = "failed"; + } else if (structuralOutcome === "partial" || semanticOutcome === "partial") { + status.outcome = "partial"; + } else { + status.outcome = "success"; + } + + status.consistency = + finalizeOutcome === "success" && + stages.core?.outcome === "success" && + stages.structural?.outcome === "success" + ? "strong" + : "weak"; + status.completed = finalizeOutcome === "success"; + status.extractionCountAfter = Number.isFinite(extractionCount) + ? extractionCount + : status.extractionCountAfter; + status.warnings = BATCH_STAGE_ORDER.flatMap( + (stage) => stages[stage]?.warnings || [], + ); + status.errors = BATCH_STAGE_ORDER.flatMap( + (stage) => stages[stage]?.errors || [], + ); + return status; +} + async function handleExtractionSuccess( result, endIdx, settings, signal = undefined, + status = createBatchStatusSkeleton({ + processedRange: [endIdx, endIdx], + extractionCountBefore: extractionCount, + }), ) { const postProcessArtifacts = []; - const warnings = []; throwIfAborted(signal, "提取已终止"); extractionCount++; ensureCurrentGraphRuntimeState(); currentGraph.historyState.extractionCount = extractionCount; updateLastExtractedItems(result.newNodeIds || []); + setBatchStageOutcome(status, "core", "success"); if (settings.enableConsolidation && result.newNodeIds?.length > 0) { try { @@ -1684,8 +1910,16 @@ async function handleExtractionSuccess( signal, }); postProcessArtifacts.push("consolidation"); + pushBatchStageArtifact(status, "structural", "consolidation"); } catch (e) { if (isAbortError(e)) throw e; + const message = e?.message || String(e) || "记忆整合阶段失败"; + setBatchStageOutcome( + status, + "structural", + "partial", + `记忆整合失败: ${message}`, + ); console.error("[ST-BME] 记忆整合失败:", e); } } @@ -1703,8 +1937,16 @@ async function handleExtractionSuccess( signal, }); postProcessArtifacts.push("synopsis"); + pushBatchStageArtifact(status, "semantic", "synopsis"); } catch (e) { if (isAbortError(e)) throw e; + const message = e?.message || String(e) || "概要生成阶段失败"; + setBatchStageOutcome( + status, + "semantic", + "failed", + `概要生成失败: ${message}`, + ); console.error("[ST-BME] 概要生成失败:", e); } } @@ -1721,8 +1963,16 @@ async function handleExtractionSuccess( signal, }); postProcessArtifacts.push("reflection"); + pushBatchStageArtifact(status, "semantic", "reflection"); } catch (e) { if (isAbortError(e)) throw e; + const message = e?.message || String(e) || "反思生成阶段失败"; + setBatchStageOutcome( + status, + "semantic", + "failed", + `反思生成失败: ${message}`, + ); console.error("[ST-BME] 反思生成失败:", e); } } @@ -1734,7 +1984,15 @@ async function handleExtractionSuccess( try { sleepCycle(currentGraph, settings); postProcessArtifacts.push("sleep"); + pushBatchStageArtifact(status, "semantic", "sleep"); } catch (e) { + const message = e?.message || String(e) || "主动遗忘阶段失败"; + setBatchStageOutcome( + status, + "semantic", + "failed", + `主动遗忘失败: ${message}`, + ); console.error("[ST-BME] 主动遗忘失败:", e); } } @@ -1752,27 +2010,63 @@ async function handleExtractionSuccess( ); if (compressionResult.created > 0 || compressionResult.archived > 0) { postProcessArtifacts.push("compression"); + pushBatchStageArtifact(status, "structural", "compression"); } } catch (error) { if (isAbortError(error)) throw error; const message = error?.message || String(error) || "压缩阶段失败"; - warnings.push(`压缩阶段失败: ${message}`); + setBatchStageOutcome( + status, + "structural", + "partial", + `压缩阶段失败: ${message}`, + ); console.error("[ST-BME] 记忆压缩失败:", error); } - const vectorSync = await syncVectorState({ signal }); + let vectorSync = null; + try { + vectorSync = await syncVectorState({ signal }); + } catch (error) { + if (isAbortError(error)) throw error; + const message = error?.message || String(error) || "向量同步阶段失败"; + setBatchStageOutcome( + status, + "finalize", + "failed", + `向量同步失败: ${message}`, + ); + return { + postProcessArtifacts, + vectorHashesInserted: [], + vectorStats: getVectorIndexStats(currentGraph), + vectorError: message, + warnings: status.warnings, + batchStatus: finalizeBatchStatus(status), + }; + } + if (vectorSync?.aborted) { throw createAbortError(vectorSync.error || "提取已终止"); } if (vectorSync?.error) { - warnings.push(`向量同步失败: ${vectorSync.error}`); + setBatchStageOutcome( + status, + "finalize", + "failed", + `向量同步失败: ${vectorSync.error}`, + ); + } else { + setBatchStageOutcome(status, "finalize", "success"); } + return { postProcessArtifacts, vectorHashesInserted: vectorSync?.insertedHashes || [], vectorStats: vectorSync?.stats || getVectorIndexStats(currentGraph), vectorError: vectorSync?.error || "", - warnings, + warnings: status.warnings, + batchStatus: finalizeBatchStatus(status), }; } @@ -2160,6 +2454,15 @@ async function prepareVectorStateForReplay( currentGraph.vectorIndexState.nodeToHash = {}; } currentGraph.vectorIndexState.dirty = true; + if (!currentGraph.vectorIndexState.dirtyReason) { + currentGraph.vectorIndexState.dirtyReason = skipBackendPurge + ? "history-recovery-replay" + : "history-recovery-reset"; + } + if (fullReset) { + currentGraph.vectorIndexState.replayRequiredNodeIds = []; + currentGraph.vectorIndexState.pendingRepairFromFloor = 0; + } currentGraph.vectorIndexState.lastWarning = skipBackendPurge ? "历史恢复后需要修复受影响后缀的后端向量索引" : "历史恢复后需要重建后端向量索引"; @@ -2169,7 +2472,10 @@ async function prepareVectorStateForReplay( if (fullReset) { currentGraph.vectorIndexState.hashToNodeId = {}; currentGraph.vectorIndexState.nodeToHash = {}; + currentGraph.vectorIndexState.replayRequiredNodeIds = []; currentGraph.vectorIndexState.dirty = true; + currentGraph.vectorIndexState.dirtyReason = "history-recovery-reset"; + currentGraph.vectorIndexState.pendingRepairFromFloor = 0; currentGraph.vectorIndexState.lastWarning = "历史恢复后需要重嵌当前聊天向量"; } @@ -2189,6 +2495,10 @@ async function executeExtractionBatch({ const extractionCountBefore = extractionCount; const beforeSnapshot = cloneGraphSnapshot(currentGraph); const messages = buildExtractionMessages(chat, startIdx, endIdx, settings); + const batchStatus = createBatchStatusSkeleton({ + processedRange: [startIdx, endIdx], + extractionCountBefore, + }); console.log( `[ST-BME] 开始提取: 楼层 ${startIdx}-${endIdx}` + @@ -2211,21 +2521,41 @@ async function executeExtractionBatch({ }); if (!result.success) { + setBatchStageOutcome( + batchStatus, + "core", + "failed", + result?.error || "提取阶段未返回有效操作", + ); + finalizeBatchStatus(batchStatus); + currentGraph.historyState.lastBatchStatus = batchStatus; return { success: false, result, effects: null, + batchStatus, error: result?.error || "提取阶段未返回有效操作", }; } + setBatchStageOutcome(batchStatus, "core", "success"); const effects = await handleExtractionSuccess( result, endIdx, settings, signal, + batchStatus, ); - updateProcessedHistorySnapshot(chat, endIdx); + const finalizedBatchStatus = + effects?.batchStatus || finalizeBatchStatus(batchStatus); + currentGraph.historyState.lastBatchStatus = { + ...finalizedBatchStatus, + historyAdvanced: shouldAdvanceProcessedHistory(finalizedBatchStatus), + }; + + if (currentGraph.historyState.lastBatchStatus.historyAdvanced) { + updateProcessedHistorySnapshot(chat, endIdx); + } const afterSnapshot = cloneGraphSnapshot(currentGraph); const postProcessArtifacts = computePostProcessArtifacts( @@ -2245,10 +2575,15 @@ async function executeExtractionBatch({ saveGraphToChat(); return { - success: true, + success: finalizedBatchStatus.completed, result, effects, - error: effects?.vectorError || "", + batchStatus: finalizedBatchStatus, + error: finalizedBatchStatus.completed + ? "" + : effects?.vectorError || + finalizedBatchStatus.errors?.[0] || + "批次未完成 finalize 闭环", }; } @@ -2289,18 +2624,41 @@ async function replayExtractionFromHistory(chat, settings, signal = undefined) { return replayedBatches; } -function collectAffectedInsertedHashes(affectedJournals = []) { - const hashes = new Set(); - for (const journal of affectedJournals) { - const insertedHashes = - journal?.vectorDelta?.insertedHashes || - journal?.vectorHashesInserted || - []; - for (const hash of insertedHashes) { - if (hash) hashes.add(hash); - } +function applyRecoveryPlanToVectorState( + recoveryPlan, + dirtyFallbackFloor = null, +) { + ensureCurrentGraphRuntimeState(); + const vectorState = currentGraph.vectorIndexState; + const replayRequiredNodeIds = new Set( + Array.isArray(vectorState.replayRequiredNodeIds) + ? vectorState.replayRequiredNodeIds.filter(Boolean) + : [], + ); + + for (const nodeId of recoveryPlan?.replayRequiredNodeIds || []) { + if (nodeId) replayRequiredNodeIds.add(nodeId); } - return [...hashes]; + + vectorState.replayRequiredNodeIds = [...replayRequiredNodeIds]; + vectorState.dirty = true; + vectorState.dirtyReason = + recoveryPlan?.dirtyReason || + vectorState.dirtyReason || + "history-recovery-replay"; + const fallbackFloor = Number.isFinite(dirtyFallbackFloor) + ? dirtyFallbackFloor + : currentGraph.historyState?.historyDirtyFrom; + vectorState.pendingRepairFromFloor = Number.isFinite( + recoveryPlan?.pendingRepairFromFloor, + ) + ? recoveryPlan.pendingRepairFromFloor + : Number.isFinite(fallbackFloor) + ? fallbackFloor + : null; + vectorState.lastWarning = recoveryPlan?.legacyGapFallback + ? "历史恢复检测到 legacy-gap,向量索引需按受影响后缀修复" + : "历史恢复后需要修复受影响后缀的向量索引"; } function rollbackAffectedJournals(graph, affectedJournals = []) { @@ -2370,18 +2728,23 @@ async function recoverHistoryIfNeeded(trigger = "history-recovery") { recoveryPath = "reverse-journal"; affectedBatchCount = recoveryPoint.affectedBatchCount || 0; const config = getEmbeddingConfig(); - const insertedHashes = collectAffectedInsertedHashes( + const recoveryPlan = buildReverseJournalRecoveryPlan( recoveryPoint.affectedJournals, + initialDirtyFrom, ); rollbackAffectedJournals(currentGraph, recoveryPoint.affectedJournals); currentGraph = normalizeGraphRuntimeState(currentGraph, chatId); extractionCount = currentGraph.historyState.extractionCount || 0; + applyRecoveryPlanToVectorState(recoveryPlan, initialDirtyFrom); - if (isBackendVectorConfig(config) && insertedHashes.length > 0) { + if ( + isBackendVectorConfig(config) && + recoveryPlan.backendDeleteHashes.length > 0 + ) { await deleteBackendVectorHashesForRecovery( currentGraph.vectorIndexState.collectionId, config, - insertedHashes, + recoveryPlan.backendDeleteHashes, historySignal, ); } @@ -2926,33 +3289,58 @@ async function onGenerationAfterCommands(type, params = {}, dryRun = false) { ); if (!recallOptions?.overrideUserMessage) return; - const recallKey = buildPreGenerationRecallKey(type, recallOptions); - const recentlyHandled = - lastPreGenerationRecallKey === recallKey && - Date.now() - lastPreGenerationRecallAt < 1500; - if (recentlyHandled) { + const recallContext = createGenerationRecallContext({ + hookName: "GENERATION_AFTER_COMMANDS", + generationType: String(type || "normal").trim() || "normal", + recallOptions, + }); + if (!recallContext.shouldRun) { return; } + markGenerationRecallTransactionHookState( + recallContext.transaction, + recallContext.hookName, + "running", + ); const didRecall = await runRecall({ ...recallOptions, - hookName: "GENERATION_AFTER_COMMANDS", + recallKey: recallContext.recallKey, + hookName: recallContext.hookName, signal: params?.signal, }); - if (didRecall) { - lastPreGenerationRecallKey = recallKey; - lastPreGenerationRecallAt = Date.now(); - skipBeforeCombineRecallUntil = Date.now() + 1500; - } + markGenerationRecallTransactionHookState( + recallContext.transaction, + recallContext.hookName, + didRecall ? "completed" : "pending", + ); } async function onBeforeCombinePrompts() { - if (skipBeforeCombineRecallUntil > Date.now()) { - skipBeforeCombineRecallUntil = 0; + const recallContext = createGenerationRecallContext({ + hookName: "GENERATE_BEFORE_COMBINE_PROMPTS", + generationType: "normal", + recallOptions: {}, + }); + if (!recallContext.shouldRun) { return; } - await runRecall({ hookName: "GENERATE_BEFORE_COMBINE_PROMPTS" }); + + markGenerationRecallTransactionHookState( + recallContext.transaction, + recallContext.hookName, + "running", + ); + const didRecall = await runRecall({ + recallKey: recallContext.recallKey, + hookName: recallContext.hookName, + }); + markGenerationRecallTransactionHookState( + recallContext.transaction, + recallContext.hookName, + didRecall ? "completed" : "pending", + ); } function onMessageReceived() { @@ -3502,6 +3890,8 @@ async function onReembedDirect() { getLastExtractionStatus: () => lastExtractionStatus, getLastVectorStatus: () => lastVectorStatus, getLastRecallStatus: () => lastRecallStatus, + getLastBatchStatus: () => + currentGraph?.historyState?.lastBatchStatus || null, getLastInjection: () => lastInjectionContent, updateSettings: (patch) => { const settings = updateModuleSettings(patch); diff --git a/runtime-state.js b/runtime-state.js index 45b7733..330673c 100644 --- a/runtime-state.js +++ b/runtime-state.js @@ -17,6 +17,7 @@ export function createDefaultHistoryState(chatId = "") { lastMutationSource: "", extractionCount: 0, lastRecoveryResult: null, + lastBatchStatus: null, }; } @@ -29,6 +30,9 @@ export function createDefaultVectorIndexState(chatId = "") { hashToNodeId: {}, nodeToHash: {}, dirty: false, + replayRequiredNodeIds: [], + dirtyReason: "", + pendingRepairFromFloor: null, lastSyncAt: 0, lastStats: { total: 0, @@ -60,7 +64,9 @@ export function normalizeGraphRuntimeState(graph, chatId = "") { historyState.chatId = chatId || historyState.chatId || ""; if (!Number.isFinite(historyState.lastProcessedAssistantFloor)) { - historyState.lastProcessedAssistantFloor = Number.isFinite(graph.lastProcessedSeq) + historyState.lastProcessedAssistantFloor = Number.isFinite( + graph.lastProcessedSeq, + ) ? graph.lastProcessedSeq : -1; } @@ -70,6 +76,20 @@ export function normalizeGraphRuntimeState(graph, chatId = "") { if (typeof historyState.lastMutationSource !== "string") { historyState.lastMutationSource = ""; } + if ( + !historyState.lastBatchStatus || + typeof historyState.lastBatchStatus !== "object" || + Array.isArray(historyState.lastBatchStatus) + ) { + historyState.lastBatchStatus = null; + } else if ( + typeof historyState.lastBatchStatus.historyAdvanced !== "boolean" + ) { + historyState.lastBatchStatus = { + ...historyState.lastBatchStatus, + historyAdvanced: false, + }; + } if ( !historyState.processedMessageHashes || @@ -93,17 +113,42 @@ export function normalizeGraphRuntimeState(graph, chatId = "") { ) { vectorIndexState.nodeToHash = {}; } - if (!vectorIndexState.lastStats || typeof vectorIndexState.lastStats !== "object") { - vectorIndexState.lastStats = createDefaultVectorIndexState(chatId).lastStats; + if ( + !vectorIndexState.lastStats || + typeof vectorIndexState.lastStats !== "object" + ) { + vectorIndexState.lastStats = + createDefaultVectorIndexState(chatId).lastStats; + } + if (!Array.isArray(vectorIndexState.replayRequiredNodeIds)) { + vectorIndexState.replayRequiredNodeIds = []; + } else { + vectorIndexState.replayRequiredNodeIds = [ + ...new Set(vectorIndexState.replayRequiredNodeIds.filter(Boolean)), + ]; + } + if (typeof vectorIndexState.dirtyReason !== "string") { + vectorIndexState.dirtyReason = ""; + } + if (!Number.isFinite(vectorIndexState.pendingRepairFromFloor)) { + vectorIndexState.pendingRepairFromFloor = null; } const previousCollectionId = vectorIndexState.collectionId; - vectorIndexState.collectionId = buildVectorCollectionId(chatId || historyState.chatId); + vectorIndexState.collectionId = buildVectorCollectionId( + chatId || historyState.chatId, + ); - if (previousCollectionId && previousCollectionId !== vectorIndexState.collectionId) { + if ( + previousCollectionId && + previousCollectionId !== vectorIndexState.collectionId + ) { vectorIndexState.hashToNodeId = {}; vectorIndexState.nodeToHash = {}; + vectorIndexState.replayRequiredNodeIds = []; vectorIndexState.dirty = true; + vectorIndexState.dirtyReason = "chat-id-changed"; + vectorIndexState.pendingRepairFromFloor = 0; vectorIndexState.lastWarning = "聊天标识变化,向量索引已标记为待重建"; } @@ -155,7 +200,10 @@ export function buildMessageHash(message) { return String(stableHashString(payload)); } -export function snapshotProcessedMessageHashes(chat, lastProcessedAssistantFloor) { +export function snapshotProcessedMessageHashes( + chat, + lastProcessedAssistantFloor, +) { const result = {}; if (!Array.isArray(chat) || lastProcessedAssistantFloor < 0) { return result; @@ -268,6 +316,113 @@ function clonePlain(value) { return JSON.parse(JSON.stringify(value)); } +function normalizeStringArray(values) { + if (!Array.isArray(values)) return []; + return [...new Set(values.filter(Boolean).map((value) => String(value)))]; +} + +function normalizeMappingArray(values) { + if (!Array.isArray(values)) return []; + const seen = new Set(); + const mappings = []; + for (const entry of values) { + if (!entry || typeof entry !== "object") continue; + const nodeId = entry.nodeId ? String(entry.nodeId) : ""; + const previousHash = entry.previousHash ? String(entry.previousHash) : ""; + const nextHash = entry.nextHash ? String(entry.nextHash) : ""; + if (!nodeId && !previousHash && !nextHash) continue; + const key = JSON.stringify([nodeId, previousHash, nextHash]); + if (seen.has(key)) continue; + seen.add(key); + mappings.push({ nodeId, previousHash, nextHash }); + } + return mappings; +} + +function buildVectorDelta(snapshotBefore, snapshotAfter, meta = {}) { + const beforeState = snapshotBefore?.vectorIndexState || {}; + const afterState = snapshotAfter?.vectorIndexState || {}; + const beforeNodeToHash = beforeState.nodeToHash || {}; + const afterNodeToHash = afterState.nodeToHash || {}; + const beforeHashSet = new Set( + Object.values(beforeState.hashToNodeId || {}).filter(Boolean), + ); + const afterHashSet = new Set( + Object.values(afterState.hashToNodeId || {}).filter(Boolean), + ); + const insertedHashes = new Set( + normalizeStringArray(meta.vectorHashesInserted), + ); + const removedHashes = new Set(normalizeStringArray(meta.vectorHashesRemoved)); + const touchedNodeIds = new Set( + normalizeStringArray(meta.vectorTouchedNodeIds), + ); + const replayRequiredNodeIds = new Set( + normalizeStringArray(meta.vectorReplayRequiredNodeIds), + ); + const backendDeleteHashes = new Set( + normalizeStringArray(meta.vectorBackendDeleteHashes), + ); + const replacedMappings = normalizeMappingArray(meta.vectorReplacedMappings); + const nodeIds = new Set([ + ...Object.keys(beforeNodeToHash), + ...Object.keys(afterNodeToHash), + ]); + + for (const hash of Object.keys(afterState.hashToNodeId || {})) { + if (!beforeHashSet.has(hash)) insertedHashes.add(hash); + } + for (const hash of Object.keys(beforeState.hashToNodeId || {})) { + if (!afterHashSet.has(hash)) removedHashes.add(hash); + } + + for (const nodeId of nodeIds) { + const previousHash = beforeNodeToHash[nodeId] + ? String(beforeNodeToHash[nodeId]) + : ""; + const nextHash = afterNodeToHash[nodeId] + ? String(afterNodeToHash[nodeId]) + : ""; + if (previousHash === nextHash) continue; + touchedNodeIds.add(String(nodeId)); + if (previousHash) { + removedHashes.add(previousHash); + backendDeleteHashes.add(previousHash); + } + if (nextHash) { + insertedHashes.add(nextHash); + } + if (previousHash || nextHash) { + const key = JSON.stringify([String(nodeId), previousHash, nextHash]); + const exists = replacedMappings.some( + (entry) => + JSON.stringify([entry.nodeId, entry.previousHash, entry.nextHash]) === + key, + ); + if (!exists) { + replacedMappings.push({ + nodeId: String(nodeId), + previousHash, + nextHash, + }); + } + } + } + + for (const nodeId of normalizeStringArray(afterState.replayRequiredNodeIds)) { + replayRequiredNodeIds.add(nodeId); + } + + return { + insertedHashes: [...insertedHashes], + removedHashes: [...removedHashes], + replacedMappings, + touchedNodeIds: [...touchedNodeIds], + replayRequiredNodeIds: [...replayRequiredNodeIds], + backendDeleteHashes: [...backendDeleteHashes], + }; +} + function buildJournalStateBefore(snapshotBefore, meta = {}) { return { lastProcessedAssistantFloor: @@ -277,7 +432,9 @@ function buildJournalStateBefore(snapshotBefore, meta = {}) { processedMessageHashes: clonePlain( snapshotBefore?.historyState?.processedMessageHashes || {}, ), - historyDirtyFrom: Number.isFinite(snapshotBefore?.historyState?.historyDirtyFrom) + historyDirtyFrom: Number.isFinite( + snapshotBefore?.historyState?.historyDirtyFrom, + ) ? snapshotBefore.historyState.historyDirtyFrom : null, vectorIndexState: clonePlain(snapshotBefore?.vectorIndexState || {}), @@ -286,11 +443,15 @@ function buildJournalStateBefore(snapshotBefore, meta = {}) { : null, extractionCount: Number.isFinite(meta.extractionCountBefore) ? meta.extractionCountBefore - : snapshotBefore?.historyState?.extractionCount ?? 0, + : (snapshotBefore?.historyState?.extractionCount ?? 0), }; } -export function createBatchJournalEntry(snapshotBefore, snapshotAfter, meta = {}) { +export function createBatchJournalEntry( + snapshotBefore, + snapshotAfter, + meta = {}, +) { const beforeNodes = buildNodeMap(snapshotBefore?.nodes || []); const afterNodes = buildNodeMap(snapshotAfter?.nodes || []); const beforeEdges = buildEdgeMap(snapshotBefore?.edges || []); @@ -333,11 +494,7 @@ export function createBatchJournalEntry(snapshotBefore, snapshotAfter, meta = {} previousNodeSnapshots, previousEdgeSnapshots, stateBefore: buildJournalStateBefore(snapshotBefore, meta), - vectorDelta: { - insertedHashes: Array.isArray(meta.vectorHashesInserted) - ? [...new Set(meta.vectorHashesInserted)] - : [], - }, + vectorDelta: buildVectorDelta(snapshotBefore, snapshotAfter, meta), postProcessArtifacts: Array.isArray(meta.postProcessArtifacts) ? meta.postProcessArtifacts : [], @@ -427,9 +584,7 @@ export function rollbackBatch(graph, journal) { journal.archivedNodeSnapshots || []; const previousEdgeSnapshots = - journal.previousEdgeSnapshots || - journal.invalidatedEdgeSnapshots || - []; + journal.previousEdgeSnapshots || journal.invalidatedEdgeSnapshots || []; graph.edges = (graph.edges || []).filter( (edge) => @@ -437,7 +592,9 @@ export function rollbackBatch(graph, journal) { !createdNodeIds.has(edge.fromId) && !createdNodeIds.has(edge.toId), ); - graph.nodes = (graph.nodes || []).filter((node) => !createdNodeIds.has(node.id)); + graph.nodes = (graph.nodes || []).filter( + (node) => !createdNodeIds.has(node.id), + ); for (const nodeSnapshot of previousNodeSnapshots) { upsertById(graph.nodes, cloneGraphSnapshot(nodeSnapshot)); @@ -470,7 +627,9 @@ export function findJournalRecoveryPoint(graph, dirtyFromFloor) { return { path: "reverse-journal", affectedIndex, - affectedJournals: affectedJournals.map((journal) => cloneGraphSnapshot(journal)), + affectedJournals: affectedJournals.map((journal) => + cloneGraphSnapshot(journal), + ), affectedBatchCount: affectedJournals.length, }; } @@ -489,6 +648,92 @@ export function findJournalRecoveryPoint(graph, dirtyFromFloor) { return null; } +export function buildReverseJournalRecoveryPlan( + affectedJournals = [], + dirtyFromFloor = null, +) { + const backendDeleteHashes = new Set(); + const replayRequiredNodeIds = new Set(); + const touchedNodeIds = new Set(); + let hasLegacyGap = false; + let minProcessedFloor = Number.isFinite(dirtyFromFloor) + ? dirtyFromFloor + : null; + + for (const journal of affectedJournals) { + const vectorDelta = journal?.vectorDelta || {}; + const insertedHashes = normalizeStringArray( + vectorDelta.insertedHashes || journal?.vectorHashesInserted || [], + ); + const removedHashes = normalizeStringArray(vectorDelta.removedHashes); + const backendDeletes = normalizeStringArray( + vectorDelta.backendDeleteHashes, + ); + const touchedNodes = normalizeStringArray(vectorDelta.touchedNodeIds); + const replayNodes = normalizeStringArray(vectorDelta.replayRequiredNodeIds); + const replacedMappings = normalizeMappingArray( + vectorDelta.replacedMappings, + ); + const range = Array.isArray(journal?.processedRange) + ? journal.processedRange + : [-1, -1]; + + if (Number.isFinite(range[0])) { + minProcessedFloor = Number.isFinite(minProcessedFloor) + ? Math.min(minProcessedFloor, range[0]) + : range[0]; + } + + for (const hash of insertedHashes) { + backendDeleteHashes.add(hash); + } + for (const hash of removedHashes) { + backendDeleteHashes.add(hash); + } + for (const hash of backendDeletes) { + backendDeleteHashes.add(hash); + } + for (const nodeId of touchedNodes) { + touchedNodeIds.add(nodeId); + replayRequiredNodeIds.add(nodeId); + } + for (const nodeId of replayNodes) { + replayRequiredNodeIds.add(nodeId); + } + for (const entry of replacedMappings) { + if (entry.nodeId) { + touchedNodeIds.add(entry.nodeId); + replayRequiredNodeIds.add(entry.nodeId); + } + if (entry.previousHash) backendDeleteHashes.add(entry.previousHash); + if (entry.nextHash) backendDeleteHashes.add(entry.nextHash); + } + + if ( + !Array.isArray(vectorDelta.removedHashes) || + !Array.isArray(vectorDelta.replacedMappings) || + !Array.isArray(vectorDelta.touchedNodeIds) || + !Array.isArray(vectorDelta.replayRequiredNodeIds) || + !Array.isArray(vectorDelta.backendDeleteHashes) + ) { + hasLegacyGap = true; + } + } + + const pendingRepairFromFloor = Number.isFinite(minProcessedFloor) + ? minProcessedFloor + : null; + + return { + backendDeleteHashes: [...backendDeleteHashes], + replayRequiredNodeIds: [...replayRequiredNodeIds], + touchedNodeIds: [...touchedNodeIds], + pendingRepairFromFloor, + legacyGapFallback: hasLegacyGap, + dirtyReason: hasLegacyGap ? "legacy-gap" : "history-recovery-replay", + }; +} + export function buildRecoveryResult(status, extra = {}) { return { status, diff --git a/tests/p0-regressions.mjs b/tests/p0-regressions.mjs new file mode 100644 index 0000000..8f0360f --- /dev/null +++ b/tests/p0-regressions.mjs @@ -0,0 +1,1015 @@ +import assert from "node:assert/strict"; +import fs from "node:fs/promises"; +import { createRequire } from "node:module"; +import path from "node:path"; +import vm from "node:vm"; + +const require = createRequire(import.meta.url); +const originalRequire = globalThis.require; +globalThis.require = require; + +const { createEmptyGraph, createNode, addNode, createEdge, addEdge } = + await import("../graph.js"); +const { compressType } = await import("../compressor.js"); +const { syncGraphVectorIndex } = await import("../vector-index.js"); +const { extractMemories } = await import("../extractor.js"); +const { consolidateMemories } = await import("../consolidator.js"); +const { + createBatchJournalEntry, + buildReverseJournalRecoveryPlan, + normalizeGraphRuntimeState, + rollbackBatch, +} = await import("../runtime-state.js"); +const llm = await import("../llm.js"); +const embedding = await import("../embedding.js"); + +if (originalRequire === undefined) { + delete globalThis.require; +} else { + globalThis.require = originalRequire; +} + +const schema = [ + { + id: "event", + label: "事件", + columns: [ + { name: "title" }, + { name: "summary" }, + { name: "participants" }, + { name: "status" }, + ], + compression: { + mode: "hierarchical", + threshold: 2, + }, + }, + { + id: "character", + label: "角色", + columns: [{ name: "name" }, { name: "state" }], + latestOnly: true, + }, +]; + +function createBatchStageHarness() { + const indexPath = path.resolve("./index.js"); + return fs.readFile(indexPath, "utf8").then((source) => { + const marker = "function isAssistantChatMessage(message) {"; + const start = source.indexOf("const BATCH_STAGE_ORDER ="); + const end = source.indexOf(marker); + if (start < 0 || end < 0 || end <= start) { + throw new Error("无法从 index.js 提取批次状态机定义"); + } + const snippet = source.slice(start, end); + const context = { + console, + result: null, + extractionCount: 0, + currentGraph: null, + consolidateMemories: async () => {}, + generateSynopsis: async () => {}, + generateReflection: async () => {}, + sleepCycle: () => {}, + compressAll: async () => ({ created: 0, archived: 0 }), + syncVectorState: async () => ({ + insertedHashes: [], + stats: { pending: 0 }, + }), + getSchema: () => schema, + getEmbeddingConfig: () => null, + getVectorIndexStats: () => ({ pending: 0 }), + updateLastExtractedItems: () => {}, + ensureCurrentGraphRuntimeState: () => {}, + throwIfAborted: () => {}, + isAbortError: () => false, + createAbortError: (message) => new Error(message), + }; + vm.createContext(context); + vm.runInContext( + `${snippet}\nresult = { createBatchStatusSkeleton, finalizeBatchStatus, handleExtractionSuccess, setBatchStageOutcome, shouldAdvanceProcessedHistory };`, + context, + { filename: indexPath }, + ); + return context; + }); +} + +function createGenerationRecallHarness() { + const indexPath = path.resolve("./index.js"); + return fs.readFile(indexPath, "utf8").then((source) => { + const start = source.indexOf("const RECALL_INPUT_RECORD_TTL_MS = 60000;"); + const end = source.indexOf("function onMessageReceived() {"); + if (start < 0 || end < 0 || end <= start) { + throw new Error("无法从 index.js 提取生成召回事务定义"); + } + const snippet = source.slice(start, end); + const context = { + console, + Date, + Map, + setTimeout, + clearTimeout, + result: null, + currentGraph: {}, + isRecalling: false, + getCurrentChatId: () => "chat-main", + normalizeRecallInputText: (text = "") => String(text || "").trim(), + pendingRecallSendIntent: { text: "", hash: "", at: 0 }, + lastRecallSentUserMessage: { text: "", hash: "", at: 0 }, + getLatestUserChatMessage: (chat = []) => + [...chat].reverse().find((message) => message?.is_user) || null, + getLastNonSystemChatMessage: (chat = []) => + [...chat].reverse().find((message) => !message?.is_system) || null, + getSendTextareaValue: () => "", + getRecallUserMessageSourceLabel: (source = "") => source, + buildRecallRecentMessages: ( + chat = [], + _limit, + syntheticUserMessage = "", + ) => + syntheticUserMessage + ? [...chat, { is_user: true, mes: syntheticUserMessage }] + : [...chat], + getContext: () => ({ + chat: context.chat, + }), + chat: [], + runRecallCalls: [], + runRecall: async (options = {}) => { + context.runRecallCalls.push({ ...options }); + return true; + }, + }; + vm.createContext(context); + vm.runInContext( + `${snippet}\nresult = { hashRecallInput, buildPreGenerationRecallKey, buildGenerationAfterCommandsRecallInput, cleanupGenerationRecallTransactions, buildGenerationRecallTransactionId, beginGenerationRecallTransaction, markGenerationRecallTransactionHookState, shouldRunRecallForTransaction, createGenerationRecallContext, onGenerationAfterCommands, onBeforeCombinePrompts, generationRecallTransactions };`, + context, + { filename: indexPath }, + ); + return context; + }); +} + +function makeEvent(seq, title) { + return createNode({ + type: "event", + seq, + fields: { + title, + summary: `${title} 摘要`, + participants: "Alice", + status: "active", + }, + }); +} + +async function testCompressorMigratesEdgesToCompressedNode() { + const graph = createEmptyGraph(); + const external = createNode({ + type: "character", + seq: 0, + fields: { name: "Alice", state: "awake" }, + }); + const first = makeEvent(1, "事件1"); + const second = makeEvent(2, "事件2"); + addNode(graph, external); + addNode(graph, first); + addNode(graph, second); + addEdge( + graph, + createEdge({ + fromId: first.id, + toId: external.id, + relation: "mentions", + strength: 0.7, + }), + ); + + const originalSummarize = llm.callLLMForJSON; + llm.callLLMForJSON = async () => ({ + fields: { + title: "压缩事件", + summary: "合并摘要", + participants: "Alice", + status: "done", + }, + }); + + try { + const result = await compressType({ + graph, + typeDef: schema[0], + embeddingConfig: null, + force: true, + settings: {}, + }); + assert.equal(result.created, 1); + + const compressed = graph.nodes.find( + (node) => node.level === 1 && !node.archived, + ); + assert.ok(compressed); + const migrated = graph.edges.find( + (edge) => + edge.fromId === compressed.id && + edge.toId === external.id && + edge.relation === "mentions" && + !edge.invalidAt && + !edge.expiredAt, + ); + assert.ok(migrated); + } finally { + llm.callLLMForJSON = originalSummarize; + } +} + +async function testVectorIndexKeepsDirtyOnDirectPartialEmbeddingFailure() { + const graph = createEmptyGraph(); + const first = makeEvent(1, "向量事件1"); + const second = makeEvent(2, "向量事件2"); + addNode(graph, first); + addNode(graph, second); + graph.vectorIndexState.dirty = true; + graph.vectorIndexState.lastWarning = "旧 warning"; + + const originalEmbedBatch = embedding.embedBatch; + embedding.embedBatch = async () => [[0.1, 0.2], null]; + + try { + const result = await syncGraphVectorIndex( + graph, + { + mode: "direct", + source: "direct", + apiUrl: "https://example.com/v1", + model: "text-embedding-3-small", + }, + {}, + ); + + assert.equal(result.insertedHashes.length, 1); + assert.equal(graph.vectorIndexState.dirty, true); + assert.equal(typeof result.stats.pending, "number"); + assert.equal(graph.vectorIndexState.lastStats, result.stats); + assert.match( + graph.vectorIndexState.lastWarning, + /部分节点 embedding 生成失败/, + ); + assert.equal( + graph.vectorIndexState.lastWarning, + "部分节点 embedding 生成失败,向量索引仍待修复", + ); + assert.equal(second.embedding, null); + } finally { + embedding.embedBatch = originalEmbedBatch; + } +} + +async function testConsolidatorMergeFallbackKeepsNodeWhenTargetMissing() { + const graph = createEmptyGraph(); + const target = createNode({ + type: "event", + seq: 3, + fields: { + title: "旧记忆", + summary: "旧摘要", + participants: "Alice", + status: "active", + }, + }); + const incoming = createNode({ + type: "event", + seq: 8, + fields: { + title: "新记忆", + summary: "新摘要", + participants: "Alice", + status: "updated", + }, + }); + target.embedding = [0.9, 0.1]; + addNode(graph, target); + addNode(graph, incoming); + + const originalFindSimilar = embedding.searchSimilar; + const originalEmbedBatch = embedding.embedBatch; + const originalCall = llm.callLLMForJSON; + embedding.embedBatch = async () => [[0.2, 0.3]]; + embedding.searchSimilar = async () => [{ nodeId: target.id, score: 0.99 }]; + llm.callLLMForJSON = async () => ({ + results: [ + { + node_id: incoming.id, + action: "merge", + merge_target_id: "missing-node-id", + reason: "故意触发无效 merge target 回退", + }, + ], + }); + + try { + const stats = await consolidateMemories({ + graph, + newNodeIds: [incoming.id], + embeddingConfig: { + mode: "direct", + source: "direct", + apiUrl: "https://example.com/v1", + model: "text-embedding-3-small", + }, + settings: {}, + }); + + assert.equal(stats.merged, 0); + assert.equal(stats.kept, 1); + assert.equal(incoming.archived, false); + assert.deepEqual(target.embedding, [0.9, 0.1]); + } finally { + embedding.searchSimilar = originalFindSimilar; + embedding.embedBatch = originalEmbedBatch; + llm.callLLMForJSON = originalCall; + } +} + +async function testExtractorFailsOnUnknownOperation() { + const graph = createEmptyGraph(); + const originalCall = llm.callLLMForJSON; + llm.callLLMForJSON = async () => ({ + operations: [{ action: "nonsense", foo: 1 }], + }); + + try { + const result = await extractMemories({ + graph, + messages: [{ seq: 4, role: "assistant", content: "测试非法操作" }], + startSeq: 4, + endSeq: 4, + schema, + embeddingConfig: null, + settings: {}, + }); + + assert.equal(result.success, false); + assert.match(result.error, /未知操作类型/); + assert.equal(graph.lastProcessedSeq, -1); + } finally { + llm.callLLMForJSON = originalCall; + } +} + +async function testConsolidatorMergeUpdatesSeqRange() { + const graph = createEmptyGraph(); + const target = createNode({ + type: "event", + seq: 3, + seqRange: [3, 4], + fields: { + title: "旧记忆", + summary: "旧摘要", + participants: "Alice", + status: "active", + }, + }); + const incoming = createNode({ + type: "event", + seq: 8, + seqRange: [8, 9], + fields: { + title: "新记忆", + summary: "新摘要", + participants: "Alice", + status: "updated", + }, + }); + addNode(graph, target); + addNode(graph, incoming); + + const originalFindSimilar = embedding.searchSimilar; + const originalCall = llm.callLLMForJSON; + embedding.searchSimilar = async () => [{ nodeId: target.id, score: 0.99 }]; + llm.callLLMForJSON = async () => ({ + results: [ + { + node_id: incoming.id, + action: "merge", + merge_target_id: target.id, + merged_fields: { summary: "合并后摘要" }, + }, + ], + }); + + try { + const stats = await consolidateMemories({ + graph, + newNodeIds: [incoming.id], + embeddingConfig: null, + settings: {}, + }); + + assert.equal(stats.merged, 1); + assert.deepEqual(target.seqRange, [3, 9]); + assert.equal(target.seq, 8); + assert.equal(target.fields.summary, "合并后摘要"); + assert.equal(target.embedding, null); + assert.equal(incoming.archived, true); + } finally { + embedding.searchSimilar = originalFindSimilar; + llm.callLLMForJSON = originalCall; + } +} + +async function testBatchJournalVectorDeltaCapturesRecoveryFields() { + const before = normalizeGraphRuntimeState(createEmptyGraph(), "chat-a"); + const after = normalizeGraphRuntimeState(createEmptyGraph(), "chat-a"); + const beforeNode = createNode({ + id: "node-before", + type: "event", + seq: 1, + fields: { title: "旧", summary: "旧", participants: "A", status: "old" }, + }); + const afterNode = createNode({ + id: "node-before", + type: "event", + seq: 1, + fields: { title: "新", summary: "新", participants: "A", status: "new" }, + }); + addNode(before, beforeNode); + addNode(after, afterNode); + before.vectorIndexState.hashToNodeId = { hash_old: "node-before" }; + before.vectorIndexState.nodeToHash = { "node-before": "hash_old" }; + after.vectorIndexState.hashToNodeId = { + hash_new: "node-before", + hash_inserted: "node-extra", + }; + after.vectorIndexState.nodeToHash = { + "node-before": "hash_new", + "node-extra": "hash_inserted", + }; + after.vectorIndexState.replayRequiredNodeIds = ["node-before", "node-extra"]; + + const journal = createBatchJournalEntry(before, after, { + processedRange: [4, 6], + vectorHashesInserted: ["hash_inserted"], + }); + + assert.deepEqual(journal.vectorDelta.insertedHashes.sort(), [ + "hash_inserted", + "hash_new", + ]); + assert.deepEqual(journal.vectorDelta.removedHashes, ["hash_old"]); + assert.deepEqual(journal.vectorDelta.touchedNodeIds.sort(), [ + "node-before", + "node-extra", + ]); + assert.deepEqual(journal.vectorDelta.replayRequiredNodeIds.sort(), [ + "node-before", + "node-extra", + ]); + assert.deepEqual(journal.vectorDelta.backendDeleteHashes, ["hash_old"]); + assert.deepEqual(journal.vectorDelta.replacedMappings, [ + { nodeId: "node-before", previousHash: "hash_old", nextHash: "hash_new" }, + { nodeId: "node-extra", previousHash: "", nextHash: "hash_inserted" }, + ]); +} + +async function testReverseJournalRecoveryPlanLegacyFallback() { + const recoveryPlan = buildReverseJournalRecoveryPlan( + [ + { + processedRange: [5, 7], + vectorDelta: { + insertedHashes: ["hash_1"], + }, + }, + ], + 5, + ); + + assert.equal(recoveryPlan.legacyGapFallback, true); + assert.equal(recoveryPlan.dirtyReason, "legacy-gap"); + assert.equal(recoveryPlan.pendingRepairFromFloor, 5); + assert.deepEqual(recoveryPlan.backendDeleteHashes, ["hash_1"]); + assert.deepEqual(recoveryPlan.replayRequiredNodeIds, []); +} + +async function testReverseJournalRecoveryPlanAggregatesDeletesAndReplay() { + const recoveryPlan = buildReverseJournalRecoveryPlan( + [ + { + processedRange: [8, 9], + vectorDelta: { + insertedHashes: ["hash_new"], + removedHashes: ["hash_removed"], + replacedMappings: [ + { + nodeId: "node-1", + previousHash: "hash_old", + nextHash: "hash_new", + }, + ], + touchedNodeIds: ["node-1"], + replayRequiredNodeIds: ["node-2"], + backendDeleteHashes: ["hash_backend"], + }, + }, + { + processedRange: [4, 6], + vectorDelta: { + insertedHashes: ["hash_other"], + removedHashes: [], + replacedMappings: [], + touchedNodeIds: ["node-3"], + replayRequiredNodeIds: ["node-3"], + backendDeleteHashes: [], + }, + }, + ], + 6, + ); + + assert.equal(recoveryPlan.legacyGapFallback, false); + assert.equal(recoveryPlan.dirtyReason, "history-recovery-replay"); + assert.equal(recoveryPlan.pendingRepairFromFloor, 4); + assert.deepEqual(recoveryPlan.backendDeleteHashes.sort(), [ + "hash_backend", + "hash_new", + "hash_old", + "hash_other", + "hash_removed", + ]); + assert.deepEqual(recoveryPlan.replayRequiredNodeIds.sort(), [ + "node-1", + "node-2", + "node-3", + ]); + assert.deepEqual(recoveryPlan.touchedNodeIds.sort(), ["node-1", "node-3"]); +} + +async function testReverseJournalRollbackStateFormsReplayClosure() { + const before = normalizeGraphRuntimeState(createEmptyGraph(), "chat-replay"); + const after = normalizeGraphRuntimeState(createEmptyGraph(), "chat-replay"); + const stableNode = createNode({ + id: "node-stable", + type: "event", + seq: 1, + fields: { + title: "稳定节点", + summary: "稳定摘要", + participants: "Alice", + status: "stable", + }, + }); + const touchedBefore = createNode({ + id: "node-touched", + type: "event", + seq: 2, + fields: { + title: "回滚前节点", + summary: "旧摘要", + participants: "Bob", + status: "old", + }, + }); + const touchedAfter = createNode({ + id: "node-touched", + type: "event", + seq: 5, + fields: { + title: "回滚后节点", + summary: "新摘要", + participants: "Bob", + status: "updated", + }, + }); + const appendedNode = createNode({ + id: "node-appended", + type: "event", + seq: 6, + fields: { + title: "新增节点", + summary: "新增摘要", + participants: "Cara", + status: "new", + }, + }); + addNode(before, stableNode); + addNode(before, touchedBefore); + addNode(after, stableNode); + addNode(after, touchedAfter); + addNode(after, appendedNode); + + before.historyState.lastProcessedAssistantFloor = 3; + before.historyState.processedMessageHashes = { + 0: "h0", + 1: "h1", + 2: "h2", + 3: "h3", + }; + before.historyState.extractionCount = 1; + before.vectorIndexState.hashToNodeId = { + hash_stable: stableNode.id, + hash_old: touchedBefore.id, + }; + before.vectorIndexState.nodeToHash = { + [stableNode.id]: "hash_stable", + [touchedBefore.id]: "hash_old", + }; + + after.historyState.lastProcessedAssistantFloor = 6; + after.historyState.processedMessageHashes = { + 0: "h0", + 1: "h1", + 2: "h2", + 3: "h3", + 4: "h4", + 5: "h5", + 6: "h6", + }; + after.historyState.extractionCount = 2; + after.vectorIndexState.hashToNodeId = { + hash_stable: stableNode.id, + hash_new: touchedAfter.id, + hash_added: appendedNode.id, + }; + after.vectorIndexState.nodeToHash = { + [stableNode.id]: "hash_stable", + [touchedAfter.id]: "hash_new", + [appendedNode.id]: "hash_added", + }; + after.vectorIndexState.replayRequiredNodeIds = [appendedNode.id]; + + const journal = createBatchJournalEntry(before, after, { + processedRange: [4, 6], + extractionCountBefore: before.historyState.extractionCount, + }); + + const runtimeGraph = normalizeGraphRuntimeState( + JSON.parse(JSON.stringify(after)), + "chat-replay", + ); + rollbackBatch(runtimeGraph, journal); + + assert.deepEqual(runtimeGraph.nodes.map((node) => node.id).sort(), [ + stableNode.id, + touchedBefore.id, + ]); + assert.deepEqual(runtimeGraph.vectorIndexState.hashToNodeId, { + hash_stable: stableNode.id, + hash_old: touchedBefore.id, + }); + assert.deepEqual(runtimeGraph.vectorIndexState.nodeToHash, { + [stableNode.id]: "hash_stable", + [touchedBefore.id]: "hash_old", + }); + assert.equal(runtimeGraph.historyState.lastProcessedAssistantFloor, 3); + + const recoveryPlan = buildReverseJournalRecoveryPlan([journal], 4); + runtimeGraph.vectorIndexState.replayRequiredNodeIds = [stableNode.id]; + runtimeGraph.vectorIndexState.dirty = false; + runtimeGraph.vectorIndexState.dirtyReason = ""; + runtimeGraph.vectorIndexState.pendingRepairFromFloor = null; + + const replayRequiredNodeIds = new Set( + runtimeGraph.vectorIndexState.replayRequiredNodeIds, + ); + for (const nodeId of recoveryPlan.replayRequiredNodeIds) { + replayRequiredNodeIds.add(nodeId); + } + runtimeGraph.vectorIndexState.replayRequiredNodeIds = [ + ...replayRequiredNodeIds, + ]; + runtimeGraph.vectorIndexState.dirty = true; + runtimeGraph.vectorIndexState.dirtyReason = + recoveryPlan.dirtyReason || + runtimeGraph.vectorIndexState.dirtyReason || + "history-recovery-replay"; + runtimeGraph.vectorIndexState.pendingRepairFromFloor = + recoveryPlan.pendingRepairFromFloor; + runtimeGraph.vectorIndexState.lastWarning = recoveryPlan.legacyGapFallback + ? "历史恢复检测到 legacy-gap,向量索引需按受影响后缀修复" + : "历史恢复后需要修复受影响后缀的向量索引"; + + assert.deepEqual( + runtimeGraph.vectorIndexState.replayRequiredNodeIds.sort(), + [appendedNode.id, stableNode.id, touchedBefore.id].sort(), + ); + assert.equal(runtimeGraph.vectorIndexState.pendingRepairFromFloor, 4); + assert.equal( + runtimeGraph.vectorIndexState.dirtyReason, + "history-recovery-replay", + ); + assert.equal( + runtimeGraph.vectorIndexState.lastWarning, + "历史恢复后需要修复受影响后缀的向量索引", + ); + assert.deepEqual(runtimeGraph.vectorIndexState.hashToNodeId, { + hash_stable: stableNode.id, + hash_old: touchedBefore.id, + }); + assert.deepEqual(runtimeGraph.vectorIndexState.nodeToHash, { + [stableNode.id]: "hash_stable", + [touchedBefore.id]: "hash_old", + }); +} + +async function testReverseJournalRecoveryPlanMixedLegacyAndCurrentRetainsRepairSet() { + const recoveryPlan = buildReverseJournalRecoveryPlan( + [ + { + processedRange: [10, 12], + vectorDelta: { + insertedHashes: ["hash-current"], + removedHashes: ["hash-removed"], + replacedMappings: [ + { + nodeId: "node-current", + previousHash: "hash-prev", + nextHash: "hash-current", + }, + ], + touchedNodeIds: ["node-current"], + replayRequiredNodeIds: ["node-extra"], + backendDeleteHashes: ["hash-backend"], + }, + }, + { + processedRange: [7, 9], + vectorDelta: { + insertedHashes: ["hash-legacy"], + }, + }, + ], + 9, + ); + + assert.equal(recoveryPlan.legacyGapFallback, true); + assert.equal(recoveryPlan.dirtyReason, "legacy-gap"); + assert.equal(recoveryPlan.pendingRepairFromFloor, 7); + assert.deepEqual(recoveryPlan.replayRequiredNodeIds.sort(), [ + "node-current", + "node-extra", + ]); + assert.deepEqual(recoveryPlan.touchedNodeIds, ["node-current"]); + assert.deepEqual(recoveryPlan.backendDeleteHashes.sort(), [ + "hash-backend", + "hash-current", + "hash-legacy", + "hash-prev", + "hash-removed", + ]); +} + +async function testBatchStatusStructuralPartialRemainsRecoverable() { + const harness = await createBatchStageHarness(); + const { createBatchStatusSkeleton, handleExtractionSuccess } = harness.result; + harness.currentGraph = { + historyState: { extractionCount: 0 }, + vectorIndexState: {}, + }; + harness.ensureCurrentGraphRuntimeState = () => { + harness.currentGraph.historyState ||= {}; + harness.currentGraph.vectorIndexState ||= {}; + }; + harness.compressAll = async () => { + throw new Error("compression down"); + }; + harness.syncVectorState = async () => ({ + insertedHashes: ["hash-ok"], + stats: { pending: 0 }, + }); + + const batchStatus = createBatchStatusSkeleton({ + processedRange: [2, 4], + extractionCountBefore: 0, + }); + const effects = await handleExtractionSuccess( + { newNodeIds: ["node-1"] }, + 4, + { + enableConsolidation: false, + enableSynopsis: false, + enableReflection: false, + enableSleepCycle: false, + synopsisEveryN: 1, + reflectEveryN: 1, + sleepEveryN: 1, + }, + undefined, + batchStatus, + ); + + assert.equal(effects.batchStatus.stages.core.outcome, "success"); + assert.equal(effects.batchStatus.stages.structural.outcome, "partial"); + assert.equal(effects.batchStatus.stages.finalize.outcome, "success"); + assert.equal(effects.batchStatus.outcome, "partial"); + assert.equal(effects.batchStatus.completed, true); + assert.equal(effects.batchStatus.consistency, "weak"); + assert.match(effects.batchStatus.warnings[0], /压缩阶段失败/); +} + +async function testBatchStatusSemanticFailureDoesNotHideCoreSuccess() { + const harness = await createBatchStageHarness(); + const { createBatchStatusSkeleton, handleExtractionSuccess } = harness.result; + harness.currentGraph = { + historyState: { extractionCount: 0 }, + vectorIndexState: {}, + }; + harness.ensureCurrentGraphRuntimeState = () => { + harness.currentGraph.historyState ||= {}; + harness.currentGraph.vectorIndexState ||= {}; + }; + harness.generateSynopsis = async () => { + throw new Error("semantic down"); + }; + harness.syncVectorState = async () => ({ + insertedHashes: [], + stats: { pending: 0 }, + }); + + const batchStatus = createBatchStatusSkeleton({ + processedRange: [5, 5], + extractionCountBefore: 0, + }); + const effects = await handleExtractionSuccess( + { newNodeIds: ["node-2"] }, + 5, + { + enableConsolidation: false, + enableSynopsis: true, + enableReflection: false, + enableSleepCycle: false, + synopsisEveryN: 1, + reflectEveryN: 1, + sleepEveryN: 1, + }, + undefined, + batchStatus, + ); + + assert.equal(effects.batchStatus.stages.core.outcome, "success"); + assert.equal(effects.batchStatus.stages.semantic.outcome, "failed"); + assert.equal(effects.batchStatus.stages.finalize.outcome, "success"); + assert.equal(effects.batchStatus.outcome, "failed"); + assert.equal(effects.batchStatus.completed, true); + assert.match(effects.batchStatus.errors[0], /概要生成失败/); +} + +async function testBatchStatusFinalizeFailureIsNotCompleteSuccess() { + const harness = await createBatchStageHarness(); + const { createBatchStatusSkeleton, handleExtractionSuccess } = harness.result; + harness.currentGraph = { + historyState: { extractionCount: 0 }, + vectorIndexState: {}, + }; + harness.ensureCurrentGraphRuntimeState = () => { + harness.currentGraph.historyState ||= {}; + harness.currentGraph.vectorIndexState ||= {}; + }; + harness.syncVectorState = async () => ({ + insertedHashes: [], + stats: { pending: 1 }, + error: "vector finalize down", + }); + + const batchStatus = createBatchStatusSkeleton({ + processedRange: [6, 7], + extractionCountBefore: 0, + }); + const effects = await handleExtractionSuccess( + { newNodeIds: ["node-3"] }, + 7, + { + enableConsolidation: false, + enableSynopsis: false, + enableReflection: false, + enableSleepCycle: false, + synopsisEveryN: 1, + reflectEveryN: 1, + sleepEveryN: 1, + }, + undefined, + batchStatus, + ); + + assert.equal(effects.batchStatus.stages.core.outcome, "success"); + assert.equal(effects.batchStatus.stages.finalize.outcome, "failed"); + assert.equal(effects.batchStatus.outcome, "failed"); + assert.equal(effects.batchStatus.completed, false); + assert.equal(effects.batchStatus.consistency, "weak"); + assert.equal(effects.vectorError, "vector finalize down"); +} + +async function testProcessedHistoryAdvanceRequiresCompleteStrongSuccess() { + const harness = await createBatchStageHarness(); + const { + createBatchStatusSkeleton, + finalizeBatchStatus, + setBatchStageOutcome, + shouldAdvanceProcessedHistory, + } = harness.result; + + const structuralPartial = createBatchStatusSkeleton({ + processedRange: [2, 4], + extractionCountBefore: 0, + }); + setBatchStageOutcome(structuralPartial, "core", "success"); + setBatchStageOutcome( + structuralPartial, + "structural", + "partial", + "compression down", + ); + setBatchStageOutcome(structuralPartial, "finalize", "success"); + finalizeBatchStatus(structuralPartial); + assert.equal(structuralPartial.completed, true); + assert.equal(structuralPartial.outcome, "partial"); + assert.equal(structuralPartial.consistency, "weak"); + assert.equal(shouldAdvanceProcessedHistory(structuralPartial), false); + + const semanticFailed = createBatchStatusSkeleton({ + processedRange: [5, 5], + extractionCountBefore: 0, + }); + setBatchStageOutcome(semanticFailed, "core", "success"); + setBatchStageOutcome(semanticFailed, "semantic", "failed", "semantic down"); + setBatchStageOutcome(semanticFailed, "finalize", "success"); + finalizeBatchStatus(semanticFailed); + assert.equal(semanticFailed.completed, true); + assert.equal(semanticFailed.outcome, "failed"); + assert.equal(semanticFailed.consistency, "strong"); + assert.equal(shouldAdvanceProcessedHistory(semanticFailed), false); + + const fullSuccess = createBatchStatusSkeleton({ + processedRange: [8, 9], + extractionCountBefore: 0, + }); + setBatchStageOutcome(fullSuccess, "core", "success"); + setBatchStageOutcome(fullSuccess, "structural", "success"); + setBatchStageOutcome(fullSuccess, "semantic", "success"); + setBatchStageOutcome(fullSuccess, "finalize", "success"); + finalizeBatchStatus(fullSuccess); + assert.equal(fullSuccess.completed, true); + assert.equal(fullSuccess.outcome, "success"); + assert.equal(fullSuccess.consistency, "strong"); + assert.equal(shouldAdvanceProcessedHistory(fullSuccess), true); +} + +async function testGenerationRecallTransactionDedupesDoubleHookBySameKey() { + const harness = await createGenerationRecallHarness(); + harness.chat = [{ is_user: true, mes: "同一轮输入" }]; + + await harness.result.onGenerationAfterCommands("normal", {}, false); + await harness.result.onBeforeCombinePrompts(); + + assert.equal(harness.runRecallCalls.length, 1); + assert.equal(harness.runRecallCalls[0].hookName, "GENERATION_AFTER_COMMANDS"); +} + +async function testGenerationRecallBeforeCombineRunsStandalone() { + const harness = await createGenerationRecallHarness(); + harness.chat = [{ is_user: true, mes: "仅 before combine" }]; + + await harness.result.onBeforeCombinePrompts(); + + assert.equal(harness.runRecallCalls.length, 1); + assert.equal( + harness.runRecallCalls[0].hookName, + "GENERATE_BEFORE_COMBINE_PROMPTS", + ); +} + +async function testGenerationRecallDifferentKeyCanRunAgain() { + const harness = await createGenerationRecallHarness(); + harness.chat = [{ is_user: true, mes: "第一条" }]; + await harness.result.onGenerationAfterCommands("normal", {}, false); + + harness.chat = [{ is_user: true, mes: "第二条" }]; + await harness.result.onGenerationAfterCommands("normal", {}, false); + + assert.equal(harness.runRecallCalls.length, 2); + assert.notEqual( + harness.runRecallCalls[0].recallKey, + harness.runRecallCalls[1].recallKey, + ); +} + +await testCompressorMigratesEdgesToCompressedNode(); +await testVectorIndexKeepsDirtyOnDirectPartialEmbeddingFailure(); +await testExtractorFailsOnUnknownOperation(); +await testConsolidatorMergeUpdatesSeqRange(); +await testConsolidatorMergeFallbackKeepsNodeWhenTargetMissing(); +await testBatchJournalVectorDeltaCapturesRecoveryFields(); +await testReverseJournalRecoveryPlanLegacyFallback(); +await testReverseJournalRecoveryPlanAggregatesDeletesAndReplay(); +await testReverseJournalRollbackStateFormsReplayClosure(); +await testReverseJournalRecoveryPlanMixedLegacyAndCurrentRetainsRepairSet(); +await testBatchStatusStructuralPartialRemainsRecoverable(); +await testBatchStatusSemanticFailureDoesNotHideCoreSuccess(); +await testBatchStatusFinalizeFailureIsNotCompleteSuccess(); +await testProcessedHistoryAdvanceRequiresCompleteStrongSuccess(); +await testGenerationRecallTransactionDedupesDoubleHookBySameKey(); +await testGenerationRecallBeforeCombineRunsStandalone(); +await testGenerationRecallDifferentKeyCanRunAgain(); + +console.log("p0-regressions tests passed"); diff --git a/vector-index.js b/vector-index.js index 1209de6..b735843 100644 --- a/vector-index.js +++ b/vector-index.js @@ -614,6 +614,7 @@ export async function syncGraphVectorIndex( } } + let directSyncHadFailures = false; if (entriesToEmbed.length > 0) { throwIfAborted(signal); const embeddings = await embedBatch( @@ -634,6 +635,8 @@ export async function syncGraphVectorIndex( state.hashToNodeId[entry.hash] = entry.nodeId; state.nodeToHash[entry.nodeId] = entry.hash; insertedHashes.push(entry.hash); + } else { + directSyncHadFailures = true; } } } @@ -642,10 +645,16 @@ export async function syncGraphVectorIndex( state.source = "direct"; state.modelScope = getVectorModelScope(config); state.collectionId = collectionId; + state.dirty = directSyncHadFailures; + state.lastWarning = directSyncHadFailures + ? "部分节点 embedding 生成失败,向量索引仍待修复" + : ""; } - state.dirty = false; - state.lastWarning = ""; + if (state.mode !== "direct") { + state.dirty = false; + state.lastWarning = ""; + } state.lastSyncAt = Date.now(); state.lastStats = computeVectorStats( graph,