fix: 多项修复与优化

This commit is contained in:
Youzini-afk
2026-03-26 02:09:18 +08:00
parent 71011400d2
commit 0e590a6256
7 changed files with 2442 additions and 649 deletions

View File

@@ -17,6 +17,7 @@ export function createDefaultHistoryState(chatId = "") {
lastMutationSource: "",
extractionCount: 0,
lastRecoveryResult: null,
lastBatchStatus: null,
};
}
@@ -29,6 +30,9 @@ export function createDefaultVectorIndexState(chatId = "") {
hashToNodeId: {},
nodeToHash: {},
dirty: false,
replayRequiredNodeIds: [],
dirtyReason: "",
pendingRepairFromFloor: null,
lastSyncAt: 0,
lastStats: {
total: 0,
@@ -60,7 +64,9 @@ export function normalizeGraphRuntimeState(graph, chatId = "") {
historyState.chatId = chatId || historyState.chatId || "";
if (!Number.isFinite(historyState.lastProcessedAssistantFloor)) {
historyState.lastProcessedAssistantFloor = Number.isFinite(graph.lastProcessedSeq)
historyState.lastProcessedAssistantFloor = Number.isFinite(
graph.lastProcessedSeq,
)
? graph.lastProcessedSeq
: -1;
}
@@ -70,6 +76,20 @@ export function normalizeGraphRuntimeState(graph, chatId = "") {
if (typeof historyState.lastMutationSource !== "string") {
historyState.lastMutationSource = "";
}
if (
!historyState.lastBatchStatus ||
typeof historyState.lastBatchStatus !== "object" ||
Array.isArray(historyState.lastBatchStatus)
) {
historyState.lastBatchStatus = null;
} else if (
typeof historyState.lastBatchStatus.historyAdvanced !== "boolean"
) {
historyState.lastBatchStatus = {
...historyState.lastBatchStatus,
historyAdvanced: false,
};
}
if (
!historyState.processedMessageHashes ||
@@ -93,17 +113,42 @@ export function normalizeGraphRuntimeState(graph, chatId = "") {
) {
vectorIndexState.nodeToHash = {};
}
if (!vectorIndexState.lastStats || typeof vectorIndexState.lastStats !== "object") {
vectorIndexState.lastStats = createDefaultVectorIndexState(chatId).lastStats;
if (
!vectorIndexState.lastStats ||
typeof vectorIndexState.lastStats !== "object"
) {
vectorIndexState.lastStats =
createDefaultVectorIndexState(chatId).lastStats;
}
if (!Array.isArray(vectorIndexState.replayRequiredNodeIds)) {
vectorIndexState.replayRequiredNodeIds = [];
} else {
vectorIndexState.replayRequiredNodeIds = [
...new Set(vectorIndexState.replayRequiredNodeIds.filter(Boolean)),
];
}
if (typeof vectorIndexState.dirtyReason !== "string") {
vectorIndexState.dirtyReason = "";
}
if (!Number.isFinite(vectorIndexState.pendingRepairFromFloor)) {
vectorIndexState.pendingRepairFromFloor = null;
}
const previousCollectionId = vectorIndexState.collectionId;
vectorIndexState.collectionId = buildVectorCollectionId(chatId || historyState.chatId);
vectorIndexState.collectionId = buildVectorCollectionId(
chatId || historyState.chatId,
);
if (previousCollectionId && previousCollectionId !== vectorIndexState.collectionId) {
if (
previousCollectionId &&
previousCollectionId !== vectorIndexState.collectionId
) {
vectorIndexState.hashToNodeId = {};
vectorIndexState.nodeToHash = {};
vectorIndexState.replayRequiredNodeIds = [];
vectorIndexState.dirty = true;
vectorIndexState.dirtyReason = "chat-id-changed";
vectorIndexState.pendingRepairFromFloor = 0;
vectorIndexState.lastWarning = "聊天标识变化,向量索引已标记为待重建";
}
@@ -155,7 +200,10 @@ export function buildMessageHash(message) {
return String(stableHashString(payload));
}
export function snapshotProcessedMessageHashes(chat, lastProcessedAssistantFloor) {
export function snapshotProcessedMessageHashes(
chat,
lastProcessedAssistantFloor,
) {
const result = {};
if (!Array.isArray(chat) || lastProcessedAssistantFloor < 0) {
return result;
@@ -268,6 +316,113 @@ function clonePlain(value) {
return JSON.parse(JSON.stringify(value));
}
function normalizeStringArray(values) {
if (!Array.isArray(values)) return [];
return [...new Set(values.filter(Boolean).map((value) => String(value)))];
}
function normalizeMappingArray(values) {
if (!Array.isArray(values)) return [];
const seen = new Set();
const mappings = [];
for (const entry of values) {
if (!entry || typeof entry !== "object") continue;
const nodeId = entry.nodeId ? String(entry.nodeId) : "";
const previousHash = entry.previousHash ? String(entry.previousHash) : "";
const nextHash = entry.nextHash ? String(entry.nextHash) : "";
if (!nodeId && !previousHash && !nextHash) continue;
const key = JSON.stringify([nodeId, previousHash, nextHash]);
if (seen.has(key)) continue;
seen.add(key);
mappings.push({ nodeId, previousHash, nextHash });
}
return mappings;
}
function buildVectorDelta(snapshotBefore, snapshotAfter, meta = {}) {
const beforeState = snapshotBefore?.vectorIndexState || {};
const afterState = snapshotAfter?.vectorIndexState || {};
const beforeNodeToHash = beforeState.nodeToHash || {};
const afterNodeToHash = afterState.nodeToHash || {};
const beforeHashSet = new Set(
Object.values(beforeState.hashToNodeId || {}).filter(Boolean),
);
const afterHashSet = new Set(
Object.values(afterState.hashToNodeId || {}).filter(Boolean),
);
const insertedHashes = new Set(
normalizeStringArray(meta.vectorHashesInserted),
);
const removedHashes = new Set(normalizeStringArray(meta.vectorHashesRemoved));
const touchedNodeIds = new Set(
normalizeStringArray(meta.vectorTouchedNodeIds),
);
const replayRequiredNodeIds = new Set(
normalizeStringArray(meta.vectorReplayRequiredNodeIds),
);
const backendDeleteHashes = new Set(
normalizeStringArray(meta.vectorBackendDeleteHashes),
);
const replacedMappings = normalizeMappingArray(meta.vectorReplacedMappings);
const nodeIds = new Set([
...Object.keys(beforeNodeToHash),
...Object.keys(afterNodeToHash),
]);
for (const hash of Object.keys(afterState.hashToNodeId || {})) {
if (!beforeHashSet.has(hash)) insertedHashes.add(hash);
}
for (const hash of Object.keys(beforeState.hashToNodeId || {})) {
if (!afterHashSet.has(hash)) removedHashes.add(hash);
}
for (const nodeId of nodeIds) {
const previousHash = beforeNodeToHash[nodeId]
? String(beforeNodeToHash[nodeId])
: "";
const nextHash = afterNodeToHash[nodeId]
? String(afterNodeToHash[nodeId])
: "";
if (previousHash === nextHash) continue;
touchedNodeIds.add(String(nodeId));
if (previousHash) {
removedHashes.add(previousHash);
backendDeleteHashes.add(previousHash);
}
if (nextHash) {
insertedHashes.add(nextHash);
}
if (previousHash || nextHash) {
const key = JSON.stringify([String(nodeId), previousHash, nextHash]);
const exists = replacedMappings.some(
(entry) =>
JSON.stringify([entry.nodeId, entry.previousHash, entry.nextHash]) ===
key,
);
if (!exists) {
replacedMappings.push({
nodeId: String(nodeId),
previousHash,
nextHash,
});
}
}
}
for (const nodeId of normalizeStringArray(afterState.replayRequiredNodeIds)) {
replayRequiredNodeIds.add(nodeId);
}
return {
insertedHashes: [...insertedHashes],
removedHashes: [...removedHashes],
replacedMappings,
touchedNodeIds: [...touchedNodeIds],
replayRequiredNodeIds: [...replayRequiredNodeIds],
backendDeleteHashes: [...backendDeleteHashes],
};
}
function buildJournalStateBefore(snapshotBefore, meta = {}) {
return {
lastProcessedAssistantFloor:
@@ -277,7 +432,9 @@ function buildJournalStateBefore(snapshotBefore, meta = {}) {
processedMessageHashes: clonePlain(
snapshotBefore?.historyState?.processedMessageHashes || {},
),
historyDirtyFrom: Number.isFinite(snapshotBefore?.historyState?.historyDirtyFrom)
historyDirtyFrom: Number.isFinite(
snapshotBefore?.historyState?.historyDirtyFrom,
)
? snapshotBefore.historyState.historyDirtyFrom
: null,
vectorIndexState: clonePlain(snapshotBefore?.vectorIndexState || {}),
@@ -286,11 +443,15 @@ function buildJournalStateBefore(snapshotBefore, meta = {}) {
: null,
extractionCount: Number.isFinite(meta.extractionCountBefore)
? meta.extractionCountBefore
: snapshotBefore?.historyState?.extractionCount ?? 0,
: (snapshotBefore?.historyState?.extractionCount ?? 0),
};
}
export function createBatchJournalEntry(snapshotBefore, snapshotAfter, meta = {}) {
export function createBatchJournalEntry(
snapshotBefore,
snapshotAfter,
meta = {},
) {
const beforeNodes = buildNodeMap(snapshotBefore?.nodes || []);
const afterNodes = buildNodeMap(snapshotAfter?.nodes || []);
const beforeEdges = buildEdgeMap(snapshotBefore?.edges || []);
@@ -333,11 +494,7 @@ export function createBatchJournalEntry(snapshotBefore, snapshotAfter, meta = {}
previousNodeSnapshots,
previousEdgeSnapshots,
stateBefore: buildJournalStateBefore(snapshotBefore, meta),
vectorDelta: {
insertedHashes: Array.isArray(meta.vectorHashesInserted)
? [...new Set(meta.vectorHashesInserted)]
: [],
},
vectorDelta: buildVectorDelta(snapshotBefore, snapshotAfter, meta),
postProcessArtifacts: Array.isArray(meta.postProcessArtifacts)
? meta.postProcessArtifacts
: [],
@@ -427,9 +584,7 @@ export function rollbackBatch(graph, journal) {
journal.archivedNodeSnapshots ||
[];
const previousEdgeSnapshots =
journal.previousEdgeSnapshots ||
journal.invalidatedEdgeSnapshots ||
[];
journal.previousEdgeSnapshots || journal.invalidatedEdgeSnapshots || [];
graph.edges = (graph.edges || []).filter(
(edge) =>
@@ -437,7 +592,9 @@ export function rollbackBatch(graph, journal) {
!createdNodeIds.has(edge.fromId) &&
!createdNodeIds.has(edge.toId),
);
graph.nodes = (graph.nodes || []).filter((node) => !createdNodeIds.has(node.id));
graph.nodes = (graph.nodes || []).filter(
(node) => !createdNodeIds.has(node.id),
);
for (const nodeSnapshot of previousNodeSnapshots) {
upsertById(graph.nodes, cloneGraphSnapshot(nodeSnapshot));
@@ -470,7 +627,9 @@ export function findJournalRecoveryPoint(graph, dirtyFromFloor) {
return {
path: "reverse-journal",
affectedIndex,
affectedJournals: affectedJournals.map((journal) => cloneGraphSnapshot(journal)),
affectedJournals: affectedJournals.map((journal) =>
cloneGraphSnapshot(journal),
),
affectedBatchCount: affectedJournals.length,
};
}
@@ -489,6 +648,92 @@ export function findJournalRecoveryPoint(graph, dirtyFromFloor) {
return null;
}
export function buildReverseJournalRecoveryPlan(
affectedJournals = [],
dirtyFromFloor = null,
) {
const backendDeleteHashes = new Set();
const replayRequiredNodeIds = new Set();
const touchedNodeIds = new Set();
let hasLegacyGap = false;
let minProcessedFloor = Number.isFinite(dirtyFromFloor)
? dirtyFromFloor
: null;
for (const journal of affectedJournals) {
const vectorDelta = journal?.vectorDelta || {};
const insertedHashes = normalizeStringArray(
vectorDelta.insertedHashes || journal?.vectorHashesInserted || [],
);
const removedHashes = normalizeStringArray(vectorDelta.removedHashes);
const backendDeletes = normalizeStringArray(
vectorDelta.backendDeleteHashes,
);
const touchedNodes = normalizeStringArray(vectorDelta.touchedNodeIds);
const replayNodes = normalizeStringArray(vectorDelta.replayRequiredNodeIds);
const replacedMappings = normalizeMappingArray(
vectorDelta.replacedMappings,
);
const range = Array.isArray(journal?.processedRange)
? journal.processedRange
: [-1, -1];
if (Number.isFinite(range[0])) {
minProcessedFloor = Number.isFinite(minProcessedFloor)
? Math.min(minProcessedFloor, range[0])
: range[0];
}
for (const hash of insertedHashes) {
backendDeleteHashes.add(hash);
}
for (const hash of removedHashes) {
backendDeleteHashes.add(hash);
}
for (const hash of backendDeletes) {
backendDeleteHashes.add(hash);
}
for (const nodeId of touchedNodes) {
touchedNodeIds.add(nodeId);
replayRequiredNodeIds.add(nodeId);
}
for (const nodeId of replayNodes) {
replayRequiredNodeIds.add(nodeId);
}
for (const entry of replacedMappings) {
if (entry.nodeId) {
touchedNodeIds.add(entry.nodeId);
replayRequiredNodeIds.add(entry.nodeId);
}
if (entry.previousHash) backendDeleteHashes.add(entry.previousHash);
if (entry.nextHash) backendDeleteHashes.add(entry.nextHash);
}
if (
!Array.isArray(vectorDelta.removedHashes) ||
!Array.isArray(vectorDelta.replacedMappings) ||
!Array.isArray(vectorDelta.touchedNodeIds) ||
!Array.isArray(vectorDelta.replayRequiredNodeIds) ||
!Array.isArray(vectorDelta.backendDeleteHashes)
) {
hasLegacyGap = true;
}
}
const pendingRepairFromFloor = Number.isFinite(minProcessedFloor)
? minProcessedFloor
: null;
return {
backendDeleteHashes: [...backendDeleteHashes],
replayRequiredNodeIds: [...replayRequiredNodeIds],
touchedNodeIds: [...touchedNodeIds],
pendingRepairFromFloor,
legacyGapFallback: hasLegacyGap,
dirtyReason: hasLegacyGap ? "legacy-gap" : "history-recovery-replay",
};
}
export function buildRecoveryResult(status, extra = {}) {
return {
status,