perf: 记忆整合批量化 — 将 N 次 LLM 压缩为 1 次批量调用

- 重构 consolidator.js 为 4 阶段架构:
  Phase 0: 收集有效新节点
  Phase 1: 批量 Embed (直连 1 次 embedBatch / 后端逐次)
  Phase 2: 各节点查近邻 (直连本地 cosine / 后端逐次 query)
  Phase 3: 单次 LLM 批量判定 (统一 prompt 含所有新节点)
  Phase 4: 逐个处理结果

- 更新 panel.js DEFAULT_PROMPTS.consolidation 为 results 数组格式
- 直连模式: N embed + N LLM -> 1 embedBatch + 1 LLM (-67~75%)
- 后端模式: N query + N LLM -> N query + 1 LLM (-33~50%)
- 总体效果: 原始 6 LLM -> 2 LLM (2新节点场景)
This commit is contained in:
Youzini-afk
2026-03-25 14:41:44 +08:00
parent 1fc7570614
commit f5a5608a12
2 changed files with 333 additions and 224 deletions

View File

@@ -1,12 +1,14 @@
// ST-BME: 统一记忆整合引擎
// ST-BME: 统一记忆整合引擎(批量化版)
// 合并 Mem0 精确对照 + A-MEM 记忆进化为单一阶段
// 每个新节点只需 1 次 embed + 1 次 LLM 调用
// 批量 embed + 批量查近邻 + 单次 LLM 调用
import { embedBatch, searchSimilar } from './embedding.js';
import { addEdge, createEdge, getActiveNodes, getNode } from './graph.js';
import { callLLMForJSON } from './llm.js';
import {
buildNodeVectorText,
findSimilarNodesByText,
isDirectVectorConfig,
validateVectorConfig,
} from './vector-index.js';
@@ -27,8 +29,7 @@ function throwIfAborted(signal) {
}
/**
* 统一记忆整合系统提示词
* 同时完成 Mem0 冲突判定 + A-MEM 进化分析
* 统一记忆整合系统提示词(支持批量输出)
*/
const CONSOLIDATION_SYSTEM_PROMPT = `你是一个记忆整合分析器。当新记忆加入知识图谱时,你需要同时完成两项任务:
@@ -45,24 +46,30 @@ const CONSOLIDATION_SYSTEM_PROMPT = `你是一个记忆整合分析器。当新
输出严格 JSON
{
"action": "keep" | "merge" | "skip",
"merge_target_id": "仅 action=merge 时必填:要合并到的旧节点 ID",
"merged_fields": { "仅 action=merge 时可选:合并后的字段更新" },
"reason": "判定理由(简述)",
"evolution": {
"should_evolve": true/false,
"connections": ["需要建立链接的旧记忆 ID 列表"],
"neighbor_updates": [
{
"nodeId": "需更新的旧节点 ID",
"newContext": "基于新信息修正后的描述(不需修改则为 null",
"newTags": ["更新后的分类标签,不需修改则为 null"]
"results": [
{
"node_id": "新记忆的节点 ID",
"action": "keep" | "merge" | "skip",
"merge_target_id": "仅 action=merge 时必填:要合并到的旧节点 ID",
"merged_fields": { "仅 action=merge 时可选:合并后的字段更新" },
"reason": "判定理由(简述)",
"evolution": {
"should_evolve": true/false,
"connections": ["需要建立链接的旧记忆 ID 列表"],
"neighbor_updates": [
{
"nodeId": "需更新的旧节点 ID",
"newContext": "基于新信息修正后的描述(不需修改则为 null",
"newTags": ["更新后的分类标签,不需修改则为 null"]
}
]
}
]
}
}
]
}
整合规则:
- 必须对每条新记忆都给出一个 result 条目
- 当 action=skip 时evolution 可省略或设 should_evolve=false
- 当 action=merge 时evolution 可省略或设 should_evolve=false
- 仅当 action=keep 且新信息确实改变了对旧记忆的理解时,才设 should_evolve=true
@@ -72,19 +79,23 @@ const CONSOLIDATION_SYSTEM_PROMPT = `你是一个记忆整合分析器。当新
- neighbor_updates 中每条必须有实际意义的修改`;
/**
* 统一记忆整合主函数
* 统一记忆整合主函数(批量化版)
*
* 合并了原先的 mem0ConflictCheck精确对照和 evolveMemories进化
* 实现"1 次 embed + 1 次 LLM"完成冲突检测 + 进化分析。
* 4 阶段架构:
* Phase 0: 收集有效新节点
* Phase 1: 批量 Embed直连 1 次 embedBatch / 后端逐次)
* Phase 2: 各节点查近邻(直连本地 cosine / 后端逐次 query
* Phase 3: 单次 LLM 批量判定
* Phase 4: 逐个处理结果
*
* @param {object} params
* @param {object} params.graph - 当前图状态
* @param {string[]} params.newNodeIds - 本次新创建的节点 ID 列表
* @param {object} params.embeddingConfig - Embedding API 配置
* @param {object} [params.options]
* @param {number} [params.options.neighborCount=5] - 近邻搜索数量
* @param {number} [params.options.conflictThreshold=0.85] - 冲突判定阈值(低于此值跳过冲突检测)
* @param {string} [params.customPrompt] - 自定义提示词
* @param {number} [params.options.neighborCount=5]
* @param {number} [params.options.conflictThreshold=0.85]
* @param {string} [params.customPrompt]
* @param {AbortSignal} [params.signal]
* @returns {Promise<{merged: number, skipped: number, kept: number, evolved: number, connections: number, updates: number}>}
*/
@@ -113,218 +124,200 @@ export async function consolidateMemories({
return stats;
}
// ══════════════════════════════════════════════
// Phase 0: 收集有效新节点
// ══════════════════════════════════════════════
const newEntries = [];
for (const id of newNodeIds) {
const node = getNode(graph, id);
if (!node || node.archived) continue;
const text = buildNodeVectorText(node);
if (!text) continue;
newEntries.push({ id, node, text });
}
if (newEntries.length === 0) return stats;
const activeNodes = getActiveNodes(graph).filter(n => {
const text = buildNodeVectorText(n);
return typeof text === 'string' && text.length > 0;
});
if (activeNodes.length < 2) return stats;
if (activeNodes.length < 2) {
// 图中节点不够,全部 keep
stats.kept = newEntries.length;
return stats;
}
for (const newId of newNodeIds) {
throwIfAborted(signal);
const newNode = getNode(graph, newId);
if (!newNode || newNode.archived) continue;
throwIfAborted(signal);
console.log(`[ST-BME] 记忆整合开始: ${newEntries.length} 个新节点`);
const queryText = buildNodeVectorText(newNode);
if (!queryText) continue;
// ══════════════════════════════════════════════
// Phase 1 + 2: 批量 Embed + 查近邻
// ══════════════════════════════════════════════
/** @type {Map<string, Array<{nodeId: string, score: number}>>} */
const neighborsMap = new Map();
// 排除自身的候选池
const candidates = activeNodes.filter(n => n.id !== newId);
if (candidates.length === 0) {
stats.kept++;
continue;
}
if (isDirectVectorConfig(embeddingConfig)) {
// ── 直连模式: 1 次 embedBatch + N 次本地 cosine ──
const texts = newEntries.map(e => e.text);
let queryVectors;
try {
// ── 1次 Embed查近邻 ──
const neighbors = await findSimilarNodesByText(
graph,
queryText,
embeddingConfig,
neighborCount,
candidates,
signal,
);
queryVectors = await embedBatch(texts, embeddingConfig, { signal });
} catch (e) {
if (isAbortError(e)) throw e;
console.warn('[ST-BME] 批量 embed 失败,回退到逐条:', e.message);
queryVectors = null;
}
if (neighbors.length === 0) {
stats.kept++;
continue;
// 构建候选池(含 embedding 的活跃节点)
const candidatePool = activeNodes
.filter(n => Array.isArray(n.embedding) && n.embedding.length > 0)
.map(n => ({ nodeId: n.id, embedding: n.embedding }));
for (let i = 0; i < newEntries.length; i++) {
throwIfAborted(signal);
const entry = newEntries[i];
const candidates = candidatePool.filter(c => c.nodeId !== entry.id);
if (queryVectors?.[i] && candidates.length > 0) {
// 本地 cosine 搜索0 API 调用)
const neighbors = searchSimilar(queryVectors[i], candidates, neighborCount);
neighborsMap.set(entry.id, neighbors);
} else {
// fallback: 逐条 embed
try {
const neighbors = await findSimilarNodesByText(
graph, entry.text, embeddingConfig, neighborCount,
activeNodes.filter(n => n.id !== entry.id), signal,
);
neighborsMap.set(entry.id, neighbors);
} catch (e) {
if (isAbortError(e)) throw e;
console.warn(`[ST-BME] 近邻查询失败 (${entry.id}):`, e.message);
neighborsMap.set(entry.id, []);
}
}
}
} else {
// ── 后端模式: 逐条 /api/vector/query ──
for (let i = 0; i < newEntries.length; i++) {
throwIfAborted(signal);
const entry = newEntries[i];
try {
const neighbors = await findSimilarNodesByText(
graph, entry.text, embeddingConfig, neighborCount,
activeNodes.filter(n => n.id !== entry.id), signal,
);
neighborsMap.set(entry.id, neighbors);
} catch (e) {
if (isAbortError(e)) throw e;
console.warn(`[ST-BME] 近邻查询失败 (${entry.id}):`, e.message);
neighborsMap.set(entry.id, []);
}
}
}
// 构建近邻描述文本
const neighborsContext = neighbors.map(n => {
// ══════════════════════════════════════════════
// Phase 3: 单次 LLM 批量判定
// ══════════════════════════════════════════════
throwIfAborted(signal);
const userPromptSections = [];
userPromptSections.push(`本轮共新增 ${newEntries.length} 条记忆,请逐条分析:\n`);
for (let i = 0; i < newEntries.length; i++) {
const entry = newEntries[i];
const neighbors = neighborsMap.get(entry.id) || [];
const newNodeFieldsStr = Object.entries(entry.node.fields)
.map(([k, v]) => `${k}: ${v}`)
.join(', ');
// 构建近邻描述
let neighborText;
if (neighbors.length === 0) {
neighborText = ' (无近邻命中)';
} else {
neighborText = neighbors.map(n => {
const node = getNode(graph, n.nodeId);
if (!node) return null;
const fieldsStr = Object.entries(node.fields)
.map(([k, v]) => `${k}: ${v}`)
.join(', ');
return `[${node.id}] 类型=${node.type}, ${fieldsStr}, 相似度=${n.score.toFixed(3)}${
(node.clusters || []).length > 0 ? `, 分类=${node.clusters.join('/')}` : ''
}`;
return ` - [${node.id}] 类型=${node.type}, ${fieldsStr} (相似度=${n.score.toFixed(3)})`;
}).filter(Boolean).join('\n');
const newNodeFieldsStr = Object.entries(newNode.fields)
.map(([k, v]) => `${k}: ${v}`)
.join(', ');
// 检查是否有高相似度命中(决定是否启用冲突检测部分的提示)
const hasHighSimilarity = neighbors[0].score > conflictThreshold;
const userPrompt = [
'## 新加入的记忆',
`[${newNode.id}] 类型=${newNode.type}, ${newNodeFieldsStr}`,
'',
'## 最近邻的已有记忆',
neighborsContext,
'',
`${neighbors.length} 条近邻记忆。`,
hasHighSimilarity
? `最高相似度 ${neighbors[0].score.toFixed(3)} 超过阈值 ${conflictThreshold},请先判断是否冲突/重复,再分析进化关系。`
: '相似度均较低,请重点分析新记忆是否揭示了关于旧记忆的新信息。',
].join('\n');
// ── 1次 LLM统一判定 ──
const decision = await callLLMForJSON({
systemPrompt: customPrompt || CONSOLIDATION_SYSTEM_PROMPT,
userPrompt,
maxRetries: 1,
signal,
});
if (!decision) {
stats.kept++;
continue;
}
// ── 处理 action ──
switch (decision.action) {
case 'skip': {
console.log(`[ST-BME] 记忆整合: skip (重复) — ${newId}`);
newNode.archived = true;
stats.skipped++;
break;
}
case 'merge': {
const targetId = decision.merge_target_id;
const targetNode = targetId ? getNode(graph, targetId) : null;
if (targetNode && !targetNode.archived) {
console.log(`[ST-BME] 记忆整合: merge ${newId}${targetId}`);
// 合并字段到旧节点
if (decision.merged_fields && typeof decision.merged_fields === 'object') {
for (const [key, value] of Object.entries(decision.merged_fields)) {
if (value != null && value !== '') {
targetNode.fields[key] = value;
}
}
} else {
// 如果没提供 merged_fields将新节点的非空字段补充到旧节点
for (const [key, value] of Object.entries(newNode.fields)) {
if (value != null && value !== '' && !targetNode.fields[key]) {
targetNode.fields[key] = value;
}
}
}
// 更新旧节点的 seq 为更新的值
if (Number.isFinite(newNode.seq) && newNode.seq > (targetNode.seq || 0)) {
targetNode.seq = newNode.seq;
}
// 标记旧节点需要 re-embed
targetNode.embedding = null;
// 归档新节点
newNode.archived = true;
stats.merged++;
} else {
// merge target 无效,回退为 keep
console.warn(`[ST-BME] 记忆整合: merge target ${targetId} 不存在,回退为 keep`);
stats.kept++;
}
break;
}
case 'keep':
default: {
stats.kept++;
break;
}
}
// ── 处理 evolution仅 keep 时有意义,但也容错处理其它 action ──
const evolution = decision.evolution;
if (evolution?.should_evolve && !newNode.archived) {
stats.evolved++;
console.log(`[ST-BME] 记忆整合/进化触发: ${decision.reason || '(无理由)'}`);
// 建立关联边
if (Array.isArray(evolution.connections)) {
for (const targetId of evolution.connections) {
if (!getNode(graph, targetId)) continue;
const edge = createEdge({
fromId: newId,
toId: targetId,
relation: 'related',
strength: 0.7,
});
if (addEdge(graph, edge)) {
stats.connections++;
}
}
}
// 反向更新旧节点
if (Array.isArray(evolution.neighbor_updates)) {
for (const update of evolution.neighbor_updates) {
if (!update.nodeId) continue;
const oldNode = getNode(graph, update.nodeId);
if (!oldNode || oldNode.archived) continue;
let changed = false;
// 更新 context/state 字段
if (update.newContext && typeof update.newContext === 'string') {
if (oldNode.fields.state !== undefined) {
oldNode.fields.state = update.newContext;
changed = true;
} else if (oldNode.fields.summary !== undefined) {
oldNode.fields.summary = update.newContext;
changed = true;
} else if (oldNode.fields.core_note !== undefined) {
oldNode.fields.core_note = update.newContext;
changed = true;
}
}
// 更新分类标签
if (update.newTags && Array.isArray(update.newTags)) {
oldNode.clusters = update.newTags;
changed = true;
}
if (changed) {
oldNode.embedding = null;
if (!oldNode._evolutionHistory) oldNode._evolutionHistory = [];
oldNode._evolutionHistory.push({
triggeredBy: newId,
timestamp: Date.now(),
reason: decision.reason || '',
});
stats.updates++;
}
}
}
}
} catch (e) {
if (isAbortError(e)) throw e;
console.error(`[ST-BME] 记忆整合失败 (${newId}):`, e);
stats.kept++;
}
// 检查高相似度
const hasHighSimilarity = neighbors.length > 0 && neighbors[0].score > conflictThreshold;
const hint = hasHighSimilarity
? ` ⚠ 最高相似度 ${neighbors[0].score.toFixed(3)} 超过阈值 ${conflictThreshold}`
: '';
userPromptSections.push([
`### 新记忆 #${i + 1}`,
`[${entry.id}] 类型=${entry.node.type}, ${newNodeFieldsStr}`,
'近邻记忆:',
neighborText,
hint,
].filter(Boolean).join('\n'));
}
const userPrompt = userPromptSections.join('\n\n');
let decision;
try {
decision = await callLLMForJSON({
systemPrompt: customPrompt || CONSOLIDATION_SYSTEM_PROMPT,
userPrompt,
maxRetries: 1,
signal,
});
} catch (e) {
if (isAbortError(e)) throw e;
console.error('[ST-BME] 记忆整合 LLM 调用失败:', e);
stats.kept = newEntries.length;
return stats;
}
// ══════════════════════════════════════════════
// Phase 4: 逐个处理结果
// ══════════════════════════════════════════════
// 解析 LLM 返回——兼容单条和批量格式
let results;
if (Array.isArray(decision?.results)) {
results = decision.results;
} else if (decision?.action) {
// 单条返回格式LLM 可能忽略 results 包装)
results = [{ ...decision, node_id: newEntries[0]?.id }];
} else {
console.warn('[ST-BME] 记忆整合: LLM 返回格式异常,全部 keep');
stats.kept = newEntries.length;
return stats;
}
// 建立 node_id → result 的映射
const resultMap = new Map();
for (const r of results) {
if (r.node_id) resultMap.set(r.node_id, r);
}
// 处理每个新节点
for (const entry of newEntries) {
const result = resultMap.get(entry.id);
if (!result) {
// LLM 未返回此节点的结果fallback 为 keep
stats.kept++;
continue;
}
processOneResult(graph, entry, result, stats);
}
// 日志
const actionSummary = [];
if (stats.merged > 0) actionSummary.push(`合并 ${stats.merged}`);
if (stats.skipped > 0) actionSummary.push(`跳过 ${stats.skipped}`);
@@ -339,3 +332,122 @@ export async function consolidateMemories({
return stats;
}
/**
* 处理单个节点的整合结果
*/
function processOneResult(graph, entry, result, stats) {
const { id: newId, node: newNode } = entry;
// ── 处理 action ──
switch (result.action) {
case 'skip': {
console.log(`[ST-BME] 记忆整合: skip (重复) — ${newId}`);
newNode.archived = true;
stats.skipped++;
break;
}
case 'merge': {
const targetId = result.merge_target_id;
const targetNode = targetId ? getNode(graph, targetId) : null;
if (targetNode && !targetNode.archived) {
console.log(`[ST-BME] 记忆整合: merge ${newId}${targetId}`);
if (result.merged_fields && typeof result.merged_fields === 'object') {
for (const [key, value] of Object.entries(result.merged_fields)) {
if (value != null && value !== '') {
targetNode.fields[key] = value;
}
}
} else {
for (const [key, value] of Object.entries(newNode.fields)) {
if (value != null && value !== '' && !targetNode.fields[key]) {
targetNode.fields[key] = value;
}
}
}
if (Number.isFinite(newNode.seq) && newNode.seq > (targetNode.seq || 0)) {
targetNode.seq = newNode.seq;
}
targetNode.embedding = null;
newNode.archived = true;
stats.merged++;
} else {
console.warn(`[ST-BME] 记忆整合: merge target ${targetId} 不存在,回退为 keep`);
stats.kept++;
}
break;
}
case 'keep':
default: {
stats.kept++;
break;
}
}
// ── 处理 evolution ──
const evolution = result.evolution;
if (evolution?.should_evolve && !newNode.archived) {
stats.evolved++;
console.log(`[ST-BME] 记忆整合/进化触发: ${result.reason || '(无理由)'}`);
if (Array.isArray(evolution.connections)) {
for (const targetId of evolution.connections) {
if (!getNode(graph, targetId)) continue;
const edge = createEdge({
fromId: newId,
toId: targetId,
relation: 'related',
strength: 0.7,
});
if (addEdge(graph, edge)) {
stats.connections++;
}
}
}
if (Array.isArray(evolution.neighbor_updates)) {
for (const update of evolution.neighbor_updates) {
if (!update.nodeId) continue;
const oldNode = getNode(graph, update.nodeId);
if (!oldNode || oldNode.archived) continue;
let changed = false;
if (update.newContext && typeof update.newContext === 'string') {
if (oldNode.fields.state !== undefined) {
oldNode.fields.state = update.newContext;
changed = true;
} else if (oldNode.fields.summary !== undefined) {
oldNode.fields.summary = update.newContext;
changed = true;
} else if (oldNode.fields.core_note !== undefined) {
oldNode.fields.core_note = update.newContext;
changed = true;
}
}
if (update.newTags && Array.isArray(update.newTags)) {
oldNode.clusters = update.newTags;
changed = true;
}
if (changed) {
oldNode.embedding = null;
if (!oldNode._evolutionHistory) oldNode._evolutionHistory = [];
oldNode._evolutionHistory.push({
triggeredBy: newId,
timestamp: Date.now(),
reason: result.reason || '',
});
stats.updates++;
}
}
}
}
}

View File

@@ -67,17 +67,14 @@ const DEFAULT_PROMPTS = {
"- 反向更新旧记忆",
"",
"输出严格 JSON",
"{",
' "action": "keep"|"merge"|"skip",',
' "merge_target_id": "旧节点ID",',
' "merged_fields": {},',
' "reason": "理由",',
' "evolution": {',
' "should_evolve": true/false,',
' "connections": ["旧记忆ID"],',
' "neighbor_updates": [{"nodeId": "旧节点ID", "newContext": "修正描述", "newTags": ["标签"]}]',
'{ "results": [',
' { "node_id": "新记忆节点ID",',
' "action": "keep"|"merge"|"skip",',
' "merge_target_id": "旧节点ID (仅merge)",',
' "reason": "理由",',
' "evolution": { "should_evolve": true/false, "connections": ["旧记忆ID"], "neighbor_updates": [...] }',
" }",
"}",
"] }",
].join("\n"),
compress: [