feat: 同批次节点默认弱关联边 + LLM 可增强/移除 + prompt 更新

- extractor: 批次操作完成后统一处理 links,再补默认弱 related 边(0.25)
- extractor: 支持 remove/delete/unlink/invalidate 语义显式移除边
- extractor: update 操作现在也能处理 links
- extractor: 边计数改为仅统计真正新增的边
- prompt: 输出格式示例加 links 字段,补 links/remove 语法说明
- prompt: 行为规则加关联边使用规范段落
- tests: 3 条回归覆盖默认弱边、显式覆盖、显式移除
This commit is contained in:
Youzini-afk
2026-04-20 20:27:15 +08:00
parent 4a95158fc1
commit d3c199fee1
3 changed files with 385 additions and 60 deletions

View File

@@ -1239,6 +1239,8 @@ export async function extractMemories({
const newNodeIds = []; // v2: 收集新建节点 ID用于进化引擎
const updatedNodeIds = [];
const refMap = new Map();
const pendingLinkJobs = [];
const suppressedDefaultPairKeys = new Set();
const operationErrors = [];
const normalizedBatchStoryTime = normalizedResult?.batchStoryTime || null;
@@ -1246,7 +1248,7 @@ export async function extractMemories({
try {
switch (op.action) {
case "create": {
const createdId = handleCreate(
const createResult = handleCreate(
graph,
op,
currentSeq,
@@ -1258,7 +1260,15 @@ export async function extractMemories({
ownershipWarnings,
normalizedBatchStoryTime,
);
if (createdId) newNodeIds.push(createdId);
if (createResult?.nodeId) {
queueOperationLinks(pendingLinkJobs, createResult.nodeId, op.links);
}
if (createResult?.created === true && createResult.nodeId) {
newNodeIds.push(createResult.nodeId);
}
if (createResult?.updated === true && createResult.nodeId) {
updatedNodeIds.push(createResult.nodeId);
}
break;
}
case "update":
@@ -1273,7 +1283,10 @@ export async function extractMemories({
ownershipWarnings,
normalizedBatchStoryTime,
);
if (updatedNodeId) updatedNodeIds.push(updatedNodeId);
if (updatedNodeId) {
updatedNodeIds.push(updatedNodeId);
queueOperationLinks(pendingLinkJobs, updatedNodeId, op.links);
}
}
break;
case "delete":
@@ -1305,6 +1318,19 @@ export async function extractMemories({
};
}
applyPendingLinks(graph, pendingLinkJobs, refMap, stats, {
suppressedDefaultPairKeys,
});
applyDefaultBatchEdges(
graph,
[...new Set([...newNodeIds, ...updatedNodeIds])],
stats,
settings,
{
suppressedDefaultPairKeys,
},
);
// 为新建节点生成 embedding。失败不应回滚整批图谱写入。
try {
await generateNodeEmbeddings(graph, embeddingConfig, signal);
@@ -1416,11 +1442,7 @@ function handleCreate(
if (op.ref) refMap.set(op.ref, existing.id);
// 处理关联边
if (op.links) {
handleLinks(graph, existing.id, op.links, refMap, stats);
}
return null;
return { nodeId: existing.id, created: false, updated: true };
}
}
@@ -1443,12 +1465,7 @@ function handleCreate(
refMap.set(op.ref, node.id);
}
// 处理关联边
if (op.links) {
handleLinks(graph, node.id, op.links, refMap, stats);
}
return node.id;
return { nodeId: node.id, created: true, updated: false };
}
/**
@@ -1541,9 +1558,7 @@ function handleUpdate(
strength: op.temporalStrength ?? 0.95,
edgeType: 0,
});
if (addEdge(graph, temporalEdge)) {
stats.newEdges++;
}
addEdgeWithStats(graph, temporalEdge, stats);
}
if (changeSummary) {
@@ -1583,14 +1598,209 @@ function handleUpdate(
edgeType: 0,
scope: updateEventNode.scope,
});
if (addEdge(graph, updateEdge)) {
stats.newEdges++;
}
addEdgeWithStats(graph, updateEdge, stats);
}
}
return updated ? op.nodeId : "";
}
function addEdgeWithStats(graph, edge, stats) {
const addedEdge = addEdge(graph, edge);
if (addedEdge === edge) {
stats.newEdges++;
}
return addedEdge;
}
function buildUndirectedPairKey(leftId, rightId) {
const normalizedLeft = String(leftId || "").trim();
const normalizedRight = String(rightId || "").trim();
if (!normalizedLeft || !normalizedRight || normalizedLeft === normalizedRight) {
return "";
}
return [normalizedLeft, normalizedRight].sort().join("::");
}
function queueOperationLinks(pendingLinkJobs, sourceId, links) {
if (!sourceId || !Array.isArray(links) || links.length === 0) return;
pendingLinkJobs.push({
sourceId: String(sourceId || ""),
links,
});
}
function resolveLinkTargetId(link = {}, refMap = new Map()) {
let targetId = link.targetNodeId || null;
if (!targetId && link.targetRef) {
targetId = refMap.get(link.targetRef);
}
return targetId ? String(targetId) : "";
}
function shouldInvalidateLink(link = {}) {
const action = String(link?.action || "").trim().toLowerCase();
return (
link?.remove === true ||
link?.delete === true ||
link?.unlink === true ||
link?.invalidate === true ||
action === "remove" ||
action === "delete" ||
action === "unlink" ||
action === "invalidate"
);
}
function isEdgeCurrentlyActive(edge) {
if (!edge) return false;
if (edge.invalidAt) return false;
if (edge.expiredAt) return false;
return true;
}
function resolveLinkRelation(link = {}, { fallback = "related" } = {}) {
const relation = String(link?.relation || "").trim();
if (RELATION_TYPES.includes(relation)) {
return relation;
}
return fallback;
}
function invalidateLinksBetween(graph, sourceId, targetId, relation = "related") {
if (!sourceId || !targetId) return 0;
let changed = 0;
for (const edge of Array.isArray(graph?.edges) ? graph.edges : []) {
if (!isEdgeCurrentlyActive(edge)) continue;
if (edge.relation !== relation) continue;
const sameDirection = edge.fromId === sourceId && edge.toId === targetId;
const reverseDirection = edge.fromId === targetId && edge.toId === sourceId;
if (!sameDirection && !reverseDirection) continue;
invalidateEdge(edge);
changed += 1;
}
return changed;
}
function handleLinks(graph, sourceId, links, refMap, stats, options = {}) {
const suppressedDefaultPairKeys =
options?.suppressedDefaultPairKeys instanceof Set
? options.suppressedDefaultPairKeys
: null;
const sourceNode = getNode(graph, sourceId);
const sourceScope = normalizeMemoryScope(sourceNode?.scope);
for (const link of links) {
const targetId = resolveLinkTargetId(link, refMap);
if (!targetId) continue;
if (shouldInvalidateLink(link)) {
const relation = resolveLinkRelation(link, { fallback: "related" });
if (suppressedDefaultPairKeys && relation === "related") {
const pairKey = buildUndirectedPairKey(sourceId, targetId);
if (pairKey) {
suppressedDefaultPairKeys.add(pairKey);
}
}
invalidateLinksBetween(graph, sourceId, targetId, relation);
continue;
}
const relation = resolveLinkRelation(link, { fallback: "related" });
const edgeType = relation === "contradicts" ? 255 : 0;
const edge = createEdge({
fromId: sourceId,
toId: targetId,
relation,
strength: link.strength ?? 0.8,
edgeType,
scope: link.scope || sourceScope,
});
addEdgeWithStats(graph, edge, stats);
}
}
function applyPendingLinks(
graph,
pendingLinkJobs,
refMap,
stats,
{ suppressedDefaultPairKeys = null } = {},
) {
for (const job of Array.isArray(pendingLinkJobs) ? pendingLinkJobs : []) {
if (!job?.sourceId || !Array.isArray(job?.links) || job.links.length === 0) {
continue;
}
handleLinks(graph, job.sourceId, job.links, refMap, stats, {
suppressedDefaultPairKeys,
});
}
}
function hasActiveEdgeBetween(graph, leftId, rightId) {
if (!leftId || !rightId) return false;
return (Array.isArray(graph?.edges) ? graph.edges : []).some((edge) => {
if (!isEdgeCurrentlyActive(edge)) return false;
return (
(edge.fromId === leftId && edge.toId === rightId) ||
(edge.fromId === rightId && edge.toId === leftId)
);
});
}
function applyDefaultBatchEdges(
graph,
nodeIds,
stats,
settings = {},
{ suppressedDefaultPairKeys = null } = {},
) {
if (settings?.extractDefaultBatchRelatedEdges === false) {
return 0;
}
const strength = Math.max(
0,
Math.min(1, Number(settings?.extractDefaultBatchRelatedEdgeStrength) || 0.25),
);
if (strength <= 0) return 0;
const orderedIds = [...new Set(
(Array.isArray(nodeIds) ? nodeIds : [])
.map((nodeId) => String(nodeId || "").trim())
.filter(Boolean),
)].filter((nodeId) => {
const node = getNode(graph, nodeId);
return Boolean(node) && node.archived !== true;
});
let createdCount = 0;
for (let leftIndex = 0; leftIndex < orderedIds.length; leftIndex += 1) {
for (let rightIndex = leftIndex + 1; rightIndex < orderedIds.length; rightIndex += 1) {
const sourceId = orderedIds[leftIndex];
const targetId = orderedIds[rightIndex];
const pairKey = buildUndirectedPairKey(sourceId, targetId);
if (suppressedDefaultPairKeys?.has(pairKey)) {
continue;
}
if (hasActiveEdgeBetween(graph, sourceId, targetId)) {
continue;
}
const sourceNode = getNode(graph, sourceId);
const edge = createEdge({
fromId: sourceId,
toId: targetId,
relation: "related",
strength,
edgeType: 0,
scope: normalizeMemoryScope(sourceNode?.scope),
});
if (addEdgeWithStats(graph, edge, stats) === edge) {
createdCount += 1;
}
}
}
return createdCount;
}
function buildFieldChangeSummary(previousFields = {}, nextFields = {}) {
const changes = [];
const keys = new Set([
@@ -1622,44 +1832,6 @@ function handleDelete(graph, op, stats) {
}
}
/**
* 处理关联边
*/
function handleLinks(graph, sourceId, links, refMap, stats) {
const sourceNode = getNode(graph, sourceId);
const sourceScope = normalizeMemoryScope(sourceNode?.scope);
for (const link of links) {
let targetId = link.targetNodeId || null;
// 通过 ref 解析目标节点
if (!targetId && link.targetRef) {
targetId = refMap.get(link.targetRef);
}
if (!targetId) continue;
// 验证关系类型
const relation = RELATION_TYPES.includes(link.relation)
? link.relation
: "related";
const edgeType = relation === "contradicts" ? 255 : 0;
const edge = createEdge({
fromId: sourceId,
toId: targetId,
relation,
strength: link.strength ?? 0.8,
edgeType,
scope: link.scope || sourceScope,
});
if (addEdge(graph, edge)) {
stats.newEdges++;
}
}
}
function resolveOperationScope(
graph,
op,

File diff suppressed because one or more lines are too long

View File

@@ -2767,6 +2767,156 @@ async function testExtractorPropagatesLlmFailureReason() {
}
}
async function testExtractorAddsWeakDefaultRelatedEdgeForSameBatchNodes() {
const graph = createEmptyGraph();
const restoreOverrides = pushTestOverrides({
llm: {
async callLLMForJSON() {
return {
operations: [
{
type: "event",
id: "evt1",
title: "钟楼初见",
summary: "两人在钟楼第一次正式见面。",
participants: "艾琳, 用户",
},
{
type: "event",
id: "evt2",
title: "钟楼密谈",
summary: "见面后立刻进入短暂密谈。",
participants: "艾琳, 用户",
},
],
};
},
},
});
try {
const result = await extractMemories({
graph,
messages: [{ seq: 10, role: "assistant", content: "测试批次默认弱连边" }],
startSeq: 10,
endSeq: 10,
schema,
embeddingConfig: null,
settings: {},
});
assert.equal(result.success, true);
assert.equal(result.newNodes, 2);
assert.equal(result.newEdges, 1);
const activeEdges = graph.edges.filter((edge) => !edge.invalidAt && !edge.expiredAt);
assert.equal(activeEdges.length, 1);
assert.equal(activeEdges[0]?.relation, "related");
assert.equal(activeEdges[0]?.strength, 0.25);
} finally {
restoreOverrides();
}
}
async function testExtractorExplicitLinksOverrideDefaultBatchWeakEdge() {
const graph = createEmptyGraph();
const restoreOverrides = pushTestOverrides({
llm: {
async callLLMForJSON() {
return {
operations: [
{
type: "event",
id: "evt1",
title: "档案室发现",
summary: "发现了一份关键档案。",
participants: "艾琳",
links: [{ targetRef: "evt2", relation: "related", strength: 0.91 }],
},
{
type: "event",
id: "evt2",
title: "档案内容核对",
summary: "紧接着对档案内容进行核对。",
participants: "艾琳",
},
],
};
},
},
});
try {
const result = await extractMemories({
graph,
messages: [{ seq: 11, role: "assistant", content: "测试显式 links 覆盖默认弱边" }],
startSeq: 11,
endSeq: 11,
schema,
embeddingConfig: null,
settings: {},
});
assert.equal(result.success, true);
assert.equal(result.newNodes, 2);
assert.equal(result.newEdges, 1);
const activeEdges = graph.edges.filter((edge) => !edge.invalidAt && !edge.expiredAt);
assert.equal(activeEdges.length, 1);
assert.equal(activeEdges[0]?.relation, "related");
assert.equal(activeEdges[0]?.strength, 0.91);
} finally {
restoreOverrides();
}
}
async function testExtractorExplicitRemoveSuppressesDefaultBatchWeakEdge() {
const graph = createEmptyGraph();
const restoreOverrides = pushTestOverrides({
llm: {
async callLLMForJSON() {
return {
operations: [
{
type: "event",
id: "evt1",
title: "花园交错",
summary: "两条线索在花园中短暂交错。",
participants: "艾琳, 守卫",
links: [{ targetRef: "evt2", relation: "related", remove: true }],
},
{
type: "event",
id: "evt2",
title: "花园分流",
summary: "随后两条线索各自分流。",
participants: "艾琳, 守卫",
},
],
};
},
},
});
try {
const result = await extractMemories({
graph,
messages: [{ seq: 12, role: "assistant", content: "测试显式移除默认弱边" }],
startSeq: 12,
endSeq: 12,
schema,
embeddingConfig: null,
settings: {},
});
assert.equal(result.success, true);
assert.equal(result.newNodes, 2);
assert.equal(result.newEdges, 0);
const activeEdges = graph.edges.filter((edge) => !edge.invalidAt && !edge.expiredAt);
assert.equal(activeEdges.length, 0);
} finally {
restoreOverrides();
}
}
async function testConsolidatorMergeUpdatesSeqRange() {
const graph = createEmptyGraph();
const target = createNode({
@@ -7208,6 +7358,9 @@ await testExtractorFailsOnUnknownOperation();
await testExtractorNormalizesFlatCreateOperation();
await testExtractorNormalizesArrayPayloadAndPreservesScopeField();
await testExtractorPropagatesLlmFailureReason();
await testExtractorAddsWeakDefaultRelatedEdgeForSameBatchNodes();
await testExtractorExplicitLinksOverrideDefaultBatchWeakEdge();
await testExtractorExplicitRemoveSuppressesDefaultBatchWeakEdge();
await testConsolidatorMergeUpdatesSeqRange();
await testConsolidatorMergeFallbackKeepsNodeWhenTargetMissing();
await testBatchJournalVectorDeltaCapturesRecoveryFields();