fix: 重构生成前注入与历史回滚链路

This commit is contained in:
Youzini-afk
2026-03-25 02:45:43 +08:00
parent 58c43d638b
commit 7bfe37e964
6 changed files with 706 additions and 132 deletions

View File

@@ -1,6 +1,7 @@
// ST-BME: 运行时状态与历史恢复辅助
const BATCH_JOURNAL_LIMIT = 24;
const BATCH_JOURNAL_LIMIT = 96;
export const BATCH_JOURNAL_VERSION = 2;
export function buildVectorCollectionId(chatId) {
return `st-bme::${chatId || "unknown-chat"}`;
@@ -13,6 +14,8 @@ export function createDefaultHistoryState(chatId = "") {
processedMessageHashes: {},
historyDirtyFrom: null,
lastMutationReason: "",
lastMutationSource: "",
extractionCount: 0,
lastRecoveryResult: null,
};
}
@@ -61,6 +64,12 @@ export function normalizeGraphRuntimeState(graph, chatId = "") {
? graph.lastProcessedSeq
: -1;
}
if (!Number.isFinite(historyState.extractionCount)) {
historyState.extractionCount = 0;
}
if (typeof historyState.lastMutationSource !== "string") {
historyState.lastMutationSource = "";
}
if (
!historyState.processedMessageHashes ||
@@ -207,7 +216,7 @@ export function detectHistoryMutation(chat, historyState) {
return { dirty: false, earliestAffectedFloor: null, reason: "" };
}
export function markHistoryDirty(graph, floor, reason = "") {
export function markHistoryDirty(graph, floor, reason = "", source = "") {
normalizeGraphRuntimeState(graph, graph?.historyState?.chatId || "");
const currentDirtyFrom = graph.historyState.historyDirtyFrom;
@@ -219,11 +228,13 @@ export function markHistoryDirty(graph, floor, reason = "") {
? Math.min(currentDirtyFrom, floor)
: floor;
graph.historyState.lastMutationReason = String(reason || "").trim();
graph.historyState.lastMutationSource = String(source || "").trim();
graph.historyState.lastRecoveryResult = {
status: "pending",
at: Date.now(),
fromFloor: graph.historyState.historyDirtyFrom,
reason: graph.historyState.lastMutationReason,
detectionSource: graph.historyState.lastMutationSource || "",
};
}
@@ -231,6 +242,7 @@ export function clearHistoryDirty(graph, result = null) {
normalizeGraphRuntimeState(graph, graph?.historyState?.chatId || "");
graph.historyState.historyDirtyFrom = null;
graph.historyState.lastMutationReason = "";
graph.historyState.lastMutationSource = "";
if (result) {
graph.historyState.lastRecoveryResult = result;
}
@@ -252,6 +264,32 @@ function hasMeaningfulEdgeChange(beforeEdge, afterEdge) {
return JSON.stringify(beforeEdge) !== JSON.stringify(afterEdge);
}
function clonePlain(value) {
return JSON.parse(JSON.stringify(value));
}
function buildJournalStateBefore(snapshotBefore, meta = {}) {
return {
lastProcessedAssistantFloor:
snapshotBefore?.historyState?.lastProcessedAssistantFloor ??
snapshotBefore?.lastProcessedSeq ??
-1,
processedMessageHashes: clonePlain(
snapshotBefore?.historyState?.processedMessageHashes || {},
),
historyDirtyFrom: Number.isFinite(snapshotBefore?.historyState?.historyDirtyFrom)
? snapshotBefore.historyState.historyDirtyFrom
: null,
vectorIndexState: clonePlain(snapshotBefore?.vectorIndexState || {}),
lastRecallResult: Array.isArray(snapshotBefore?.lastRecallResult)
? [...snapshotBefore.lastRecallResult]
: null,
extractionCount: Number.isFinite(meta.extractionCountBefore)
? meta.extractionCountBefore
: snapshotBefore?.historyState?.extractionCount ?? 0,
};
}
export function createBatchJournalEntry(snapshotBefore, snapshotAfter, meta = {}) {
const beforeNodes = buildNodeMap(snapshotBefore?.nodes || []);
const afterNodes = buildNodeMap(snapshotAfter?.nodes || []);
@@ -260,9 +298,8 @@ export function createBatchJournalEntry(snapshotBefore, snapshotAfter, meta = {}
const createdNodeIds = [];
const createdEdgeIds = [];
const updatedNodeSnapshots = [];
const archivedNodeSnapshots = [];
const invalidatedEdgeSnapshots = [];
const previousNodeSnapshots = [];
const previousEdgeSnapshots = [];
for (const [nodeId, afterNode] of afterNodes.entries()) {
if (!beforeNodes.has(nodeId)) {
@@ -272,11 +309,7 @@ export function createBatchJournalEntry(snapshotBefore, snapshotAfter, meta = {}
const beforeNode = beforeNodes.get(nodeId);
if (!hasMeaningfulNodeChange(beforeNode, afterNode)) continue;
updatedNodeSnapshots.push(cloneGraphSnapshot(beforeNode));
if (beforeNode.archived !== afterNode.archived) {
archivedNodeSnapshots.push(cloneGraphSnapshot(beforeNode));
}
previousNodeSnapshots.push(cloneGraphSnapshot(beforeNode));
}
for (const [edgeId, afterEdge] of afterEdges.entries()) {
@@ -287,31 +320,34 @@ export function createBatchJournalEntry(snapshotBefore, snapshotAfter, meta = {}
const beforeEdge = beforeEdges.get(edgeId);
if (!hasMeaningfulEdgeChange(beforeEdge, afterEdge)) continue;
if (
beforeEdge.invalidAt !== afterEdge.invalidAt ||
beforeEdge.expiredAt !== afterEdge.expiredAt
) {
invalidatedEdgeSnapshots.push(cloneGraphSnapshot(beforeEdge));
}
previousEdgeSnapshots.push(cloneGraphSnapshot(beforeEdge));
}
return {
const entry = {
id: `batch-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`,
journalVersion: BATCH_JOURNAL_VERSION,
createdAt: Date.now(),
processedRange: meta.processedRange || [-1, -1],
createdNodeIds,
createdEdgeIds,
updatedNodeSnapshots,
archivedNodeSnapshots,
invalidatedEdgeSnapshots,
vectorHashesInserted: Array.isArray(meta.vectorHashesInserted)
? [...new Set(meta.vectorHashesInserted)]
: [],
previousNodeSnapshots,
previousEdgeSnapshots,
stateBefore: buildJournalStateBefore(snapshotBefore, meta),
vectorDelta: {
insertedHashes: Array.isArray(meta.vectorHashesInserted)
? [...new Set(meta.vectorHashesInserted)]
: [],
},
postProcessArtifacts: Array.isArray(meta.postProcessArtifacts)
? meta.postProcessArtifacts
: [],
snapshotBefore,
};
if (meta.includeLegacySnapshotBefore) {
entry.snapshotBefore = snapshotBefore;
}
return entry;
}
export function appendBatchJournal(graph, entry) {
@@ -322,6 +358,99 @@ export function appendBatchJournal(graph, entry) {
}
}
function upsertById(list, item) {
const index = list.findIndex((entry) => entry.id === item.id);
if (index >= 0) {
list[index] = item;
} else {
list.push(item);
}
}
function sanitizeGraphReferences(graph) {
const nodeIds = new Set((graph?.nodes || []).map((node) => node.id));
graph.nodes = (graph.nodes || []).map((node) => ({
...node,
parentId: nodeIds.has(node.parentId) ? node.parentId : null,
childIds: Array.isArray(node.childIds)
? node.childIds.filter((id) => nodeIds.has(id))
: [],
prevId: nodeIds.has(node.prevId) ? node.prevId : null,
nextId: nodeIds.has(node.nextId) ? node.nextId : null,
}));
graph.edges = (graph.edges || []).filter(
(edge) => nodeIds.has(edge.fromId) && nodeIds.has(edge.toId),
);
}
function applyJournalStateBefore(graph, stateBefore = {}) {
const historyState = {
...createDefaultHistoryState(graph?.historyState?.chatId || ""),
...(graph.historyState || {}),
};
historyState.lastProcessedAssistantFloor = Number.isFinite(
stateBefore.lastProcessedAssistantFloor,
)
? stateBefore.lastProcessedAssistantFloor
: historyState.lastProcessedAssistantFloor;
historyState.processedMessageHashes = clonePlain(
stateBefore.processedMessageHashes || {},
);
historyState.historyDirtyFrom = Number.isFinite(stateBefore.historyDirtyFrom)
? stateBefore.historyDirtyFrom
: null;
historyState.extractionCount = Number.isFinite(stateBefore.extractionCount)
? stateBefore.extractionCount
: historyState.extractionCount;
graph.historyState = historyState;
graph.vectorIndexState = {
...createDefaultVectorIndexState(graph?.historyState?.chatId || ""),
...clonePlain(stateBefore.vectorIndexState || {}),
};
graph.lastRecallResult = Array.isArray(stateBefore.lastRecallResult)
? [...stateBefore.lastRecallResult]
: null;
graph.lastProcessedSeq = historyState.lastProcessedAssistantFloor;
}
export function rollbackBatch(graph, journal) {
if (!graph || !journal) return graph;
normalizeGraphRuntimeState(graph, graph?.historyState?.chatId || "");
const createdNodeIds = new Set(journal.createdNodeIds || []);
const createdEdgeIds = new Set(journal.createdEdgeIds || []);
const previousNodeSnapshots =
journal.previousNodeSnapshots ||
journal.updatedNodeSnapshots ||
journal.archivedNodeSnapshots ||
[];
const previousEdgeSnapshots =
journal.previousEdgeSnapshots ||
journal.invalidatedEdgeSnapshots ||
[];
graph.edges = (graph.edges || []).filter(
(edge) =>
!createdEdgeIds.has(edge.id) &&
!createdNodeIds.has(edge.fromId) &&
!createdNodeIds.has(edge.toId),
);
graph.nodes = (graph.nodes || []).filter((node) => !createdNodeIds.has(node.id));
for (const nodeSnapshot of previousNodeSnapshots) {
upsertById(graph.nodes, cloneGraphSnapshot(nodeSnapshot));
}
for (const edgeSnapshot of previousEdgeSnapshots) {
upsertById(graph.edges, cloneGraphSnapshot(edgeSnapshot));
}
applyJournalStateBefore(graph, journal.stateBefore || {});
sanitizeGraphReferences(graph);
return graph;
}
export function findJournalRecoveryPoint(graph, dirtyFromFloor) {
const journals = Array.isArray(graph?.batchJournal) ? graph.batchJournal : [];
const affectedIndex = journals.findIndex((journal) => {
@@ -333,14 +462,31 @@ export function findJournalRecoveryPoint(graph, dirtyFromFloor) {
if (affectedIndex < 0) return null;
const journal = journals[affectedIndex];
if (!journal?.snapshotBefore) return null;
const affectedJournals = journals.slice(affectedIndex);
const canReverse = affectedJournals.every(
(journal) => Number(journal?.journalVersion || 0) >= BATCH_JOURNAL_VERSION,
);
if (canReverse) {
return {
path: "reverse-journal",
affectedIndex,
affectedJournals: affectedJournals.map((journal) => cloneGraphSnapshot(journal)),
affectedBatchCount: affectedJournals.length,
};
}
return {
affectedIndex,
journal,
snapshotBefore: cloneGraphSnapshot(journal.snapshotBefore),
};
const journal = journals[affectedIndex];
if (journal?.snapshotBefore) {
return {
path: "legacy-snapshot",
affectedIndex,
journal: cloneGraphSnapshot(journal),
snapshotBefore: cloneGraphSnapshot(journal.snapshotBefore),
affectedBatchCount: affectedJournals.length,
};
}
return null;
}
export function buildRecoveryResult(status, extra = {}) {