perf: optimize persist/load P1 hot paths

This commit is contained in:
Youzini-afk
2026-04-22 18:34:56 +08:00
parent b1937336bd
commit cfc122244a
13 changed files with 1707 additions and 78 deletions

View File

@@ -646,6 +646,26 @@ export function buildSnapshotFromGraph(graph, options = {}) {
!Array.isArray(options.baseSnapshot)
? options.baseSnapshot
: {};
const shouldCollectDiagnostics = typeof options?.onDiagnostics === "function";
const snapshotStartedAt = shouldCollectDiagnostics ? readPersistDeltaNow() : 0;
const snapshotDiagnostics = shouldCollectDiagnostics
? {
nodeCount: 0,
edgeCount: 0,
tombstoneCount: 0,
reusedNodeCount: 0,
reusedEdgeCount: 0,
reusedTombstoneCount: 0,
clonedNodeCount: 0,
clonedEdgeCount: 0,
clonedTombstoneCount: 0,
nodesMs: 0,
edgesMs: 0,
tombstonesMs: 0,
stateMs: 0,
metaMs: 0,
}
: null;
const baseSnapshot = sanitizeSnapshot(baseSnapshotInput);
const baseSnapshotView = normalizePersistSnapshotView(baseSnapshotInput);
const nowMs = normalizeTimestamp(options.nowMs, Date.now());
@@ -674,6 +694,7 @@ export function buildSnapshotFromGraph(graph, options = {}) {
baseSnapshotView.tombstones,
);
const nodesStartedAt = shouldCollectDiagnostics ? readPersistDeltaNow() : 0;
const nodes = toArray(runtimeGraph?.nodes)
.map((node) => {
if (!node || typeof node !== "object" || Array.isArray(node)) {
@@ -689,18 +710,29 @@ export function buildSnapshotFromGraph(graph, options = {}) {
updatedAt: normalizedUpdatedAt,
})
) {
if (snapshotDiagnostics) {
snapshotDiagnostics.reusedNodeCount += 1;
}
return baseNode;
}
const plainNode = clonePersistSnapshotRecord(node);
if (!plainNode || typeof plainNode !== "object" || Array.isArray(plainNode)) {
return null;
}
if (snapshotDiagnostics) {
snapshotDiagnostics.clonedNodeCount += 1;
}
plainNode.id = id;
plainNode.updatedAt = normalizedUpdatedAt;
return plainNode;
})
.filter(Boolean);
if (snapshotDiagnostics) {
snapshotDiagnostics.nodeCount = nodes.length;
snapshotDiagnostics.nodesMs = readPersistDeltaNow() - nodesStartedAt;
}
const edgesStartedAt = shouldCollectDiagnostics ? readPersistDeltaNow() : 0;
const edges = toArray(runtimeGraph?.edges)
.map((edge) => {
if (!edge || typeof edge !== "object" || Array.isArray(edge)) {
@@ -719,12 +751,18 @@ export function buildSnapshotFromGraph(graph, options = {}) {
updatedAt: normalizedUpdatedAt,
})
) {
if (snapshotDiagnostics) {
snapshotDiagnostics.reusedEdgeCount += 1;
}
return baseEdge;
}
const plainEdge = clonePersistSnapshotRecord(edge);
if (!plainEdge || typeof plainEdge !== "object" || Array.isArray(plainEdge)) {
return null;
}
if (snapshotDiagnostics) {
snapshotDiagnostics.clonedEdgeCount += 1;
}
plainEdge.id = id;
plainEdge.fromId = normalizedFromId;
plainEdge.toId = normalizedToId;
@@ -732,7 +770,12 @@ export function buildSnapshotFromGraph(graph, options = {}) {
return plainEdge;
})
.filter(Boolean);
if (snapshotDiagnostics) {
snapshotDiagnostics.edgeCount = edges.length;
snapshotDiagnostics.edgesMs = readPersistDeltaNow() - edgesStartedAt;
}
const tombstonesStartedAt = shouldCollectDiagnostics ? readPersistDeltaNow() : 0;
const tombstones = toArray(options.tombstones ?? baseSnapshotView.tombstones)
.map((record) => {
if (!record || typeof record !== "object" || Array.isArray(record))
@@ -752,12 +795,18 @@ export function buildSnapshotFromGraph(graph, options = {}) {
deletedAt: normalizedDeletedAt,
})
) {
if (snapshotDiagnostics) {
snapshotDiagnostics.reusedTombstoneCount += 1;
}
return baseTombstone;
}
const plainRecord = clonePersistSnapshotRecord(record);
if (!plainRecord || typeof plainRecord !== "object" || Array.isArray(plainRecord)) {
return null;
}
if (snapshotDiagnostics) {
snapshotDiagnostics.clonedTombstoneCount += 1;
}
plainRecord.id = id;
plainRecord.kind = normalizedKind;
plainRecord.targetId = normalizedTargetId;
@@ -766,7 +815,13 @@ export function buildSnapshotFromGraph(graph, options = {}) {
return plainRecord;
})
.filter(Boolean);
if (snapshotDiagnostics) {
snapshotDiagnostics.tombstoneCount = tombstones.length;
snapshotDiagnostics.tombstonesMs =
readPersistDeltaNow() - tombstonesStartedAt;
}
const stateStartedAt = shouldCollectDiagnostics ? readPersistDeltaNow() : 0;
const state = {
...normalizeStateSnapshot(baseSnapshot),
...(options.state || {}),
@@ -783,7 +838,11 @@ export function buildSnapshotFromGraph(graph, options = {}) {
? Number(runtimeGraph.historyState.extractionCount)
: META_DEFAULT_EXTRACTION_COUNT,
};
if (snapshotDiagnostics) {
snapshotDiagnostics.stateMs = readPersistDeltaNow() - stateStartedAt;
}
const metaStartedAt = shouldCollectDiagnostics ? readPersistDeltaNow() : 0;
const mergedMeta = {
...baseSnapshot.meta,
...(options.meta || {}),
@@ -869,14 +928,26 @@ export function buildSnapshotFromGraph(graph, options = {}) {
? Number(runtimeGraph.version)
: Number(baseSnapshot.meta?.[BME_RUNTIME_GRAPH_VERSION_META_KEY] || 0),
};
if (snapshotDiagnostics) {
snapshotDiagnostics.metaMs = readPersistDeltaNow() - metaStartedAt;
}
return {
const snapshotResult = {
meta: mergedMeta,
nodes,
edges,
tombstones,
state,
};
if (snapshotDiagnostics) {
emitOptionalDiagnostics(options, {
...snapshotDiagnostics,
runtimeMetaKeyCount: Object.keys(mergedMeta).length,
totalMs: readPersistDeltaNow() - snapshotStartedAt,
});
}
return snapshotResult;
}
function normalizeSnapshotMetaState(snapshot = {}) {
@@ -1630,7 +1701,7 @@ function readPersistDeltaNow() {
return Date.now();
}
function emitPersistDeltaDiagnostics(options = {}, snapshot = null) {
function emitOptionalDiagnostics(options = {}, snapshot = null) {
if (typeof options?.onDiagnostics !== "function") return;
try {
options.onDiagnostics(snapshot ? toPlainData(snapshot, snapshot) : null);
@@ -1639,6 +1710,10 @@ function emitPersistDeltaDiagnostics(options = {}, snapshot = null) {
}
}
function emitPersistDeltaDiagnostics(options = {}, snapshot = null) {
emitOptionalDiagnostics(options, snapshot);
}
function tryBuildNativePersistDelta(
beforeSnapshot,
afterSnapshot,
@@ -2016,6 +2091,23 @@ export function buildPersistDelta(beforeSnapshot, afterSnapshot, options = {}) {
}
export function buildGraphFromSnapshot(snapshot, options = {}) {
const shouldCollectDiagnostics = typeof options?.onDiagnostics === "function";
const hydrateStartedAt = shouldCollectDiagnostics ? readPersistDeltaNow() : 0;
const hydrateDiagnostics = shouldCollectDiagnostics
? {
success: false,
nodeCount: 0,
edgeCount: 0,
tombstoneCount: 0,
nodesMs: 0,
edgesMs: 0,
runtimeMetaMs: 0,
stateMs: 0,
normalizeMs: 0,
integrityMs: 0,
integrityReasonCount: 0,
}
: null;
const snapshotView = normalizePersistSnapshotView(snapshot);
const snapshotMeta =
snapshotView.meta &&
@@ -2048,8 +2140,24 @@ export function buildGraphFromSnapshot(snapshot, options = {}) {
)
? Number(snapshotMeta[BME_RUNTIME_GRAPH_VERSION_META_KEY])
: runtimeGraph.version;
const hydrateNodesStartedAt = shouldCollectDiagnostics ? readPersistDeltaNow() : 0;
runtimeGraph.nodes = toArray(toPlainData(snapshotView.nodes, []));
if (hydrateDiagnostics) {
hydrateDiagnostics.nodeCount = runtimeGraph.nodes.length;
hydrateDiagnostics.nodesMs = readPersistDeltaNow() - hydrateNodesStartedAt;
}
const hydrateEdgesStartedAt = shouldCollectDiagnostics ? readPersistDeltaNow() : 0;
runtimeGraph.edges = toArray(toPlainData(snapshotView.edges, []));
if (hydrateDiagnostics) {
hydrateDiagnostics.edgeCount = runtimeGraph.edges.length;
hydrateDiagnostics.edgesMs = readPersistDeltaNow() - hydrateEdgesStartedAt;
}
const hydrateRuntimeMetaStartedAt = shouldCollectDiagnostics
? readPersistDeltaNow()
: 0;
runtimeGraph.batchJournal = toArray(
toPlainData(snapshotMeta?.[BME_RUNTIME_BATCH_JOURNAL_META_KEY], []),
);
@@ -2076,6 +2184,10 @@ export function buildGraphFromSnapshot(snapshot, options = {}) {
snapshotMeta?.[BME_RUNTIME_SUMMARY_STATE_META_KEY],
runtimeGraph.summaryState || {},
);
if (hydrateDiagnostics) {
hydrateDiagnostics.runtimeMetaMs =
readPersistDeltaNow() - hydrateRuntimeMetaStartedAt;
}
const rawKnowledgeState =
runtimeGraph.knowledgeState &&
typeof runtimeGraph.knowledgeState === "object" &&
@@ -2095,6 +2207,7 @@ export function buildGraphFromSnapshot(snapshot, options = {}) {
? runtimeGraph.timelineState
: {};
const hydrateStateStartedAt = shouldCollectDiagnostics ? readPersistDeltaNow() : 0;
runtimeGraph.historyState = {
...(runtimeGraph.historyState || {}),
...snapshotHistoryState,
@@ -2183,8 +2296,16 @@ export function buildGraphFromSnapshot(snapshot, options = {}) {
)
? Number(snapshotMeta[BME_RUNTIME_LAST_PROCESSED_SEQ_META_KEY])
: Number(runtimeGraph.historyState.lastProcessedAssistantFloor);
if (hydrateDiagnostics) {
hydrateDiagnostics.tombstoneCount = toArray(snapshotView.tombstones).length;
hydrateDiagnostics.stateMs = readPersistDeltaNow() - hydrateStateStartedAt;
}
const normalizeStartedAt = shouldCollectDiagnostics ? readPersistDeltaNow() : 0;
const normalizedGraph = normalizeGraphRuntimeState(runtimeGraph, chatId);
if (hydrateDiagnostics) {
hydrateDiagnostics.normalizeMs = readPersistDeltaNow() - normalizeStartedAt;
}
if (
normalizedGraph.knowledgeState &&
typeof normalizedGraph.knowledgeState === "object" &&
@@ -2238,6 +2359,7 @@ export function buildGraphFromSnapshot(snapshot, options = {}) {
);
const inconsistentReasons = [];
const integrityStartedAt = shouldCollectDiagnostics ? readPersistDeltaNow() : 0;
if (
Number.isFinite(resolvedLastProcessedFloor) &&
Number.isFinite(resolvedLastProcessedSeq) &&
@@ -2255,8 +2377,20 @@ export function buildGraphFromSnapshot(snapshot, options = {}) {
if (collectionId && collectionId !== expectedCollectionId) {
inconsistentReasons.push("vector-collection-mismatch");
}
if (hydrateDiagnostics) {
hydrateDiagnostics.integrityMs = readPersistDeltaNow() - integrityStartedAt;
hydrateDiagnostics.integrityReasonCount = inconsistentReasons.length;
}
if (inconsistentReasons.length > 0) {
if (hydrateDiagnostics) {
emitOptionalDiagnostics(options, {
...hydrateDiagnostics,
success: false,
integrityReasons: [...inconsistentReasons],
totalMs: readPersistDeltaNow() - hydrateStartedAt,
});
}
const error = new Error(
`图谱快照完整性校验失败: ${inconsistentReasons.join(", ")}`,
);
@@ -2266,6 +2400,15 @@ export function buildGraphFromSnapshot(snapshot, options = {}) {
throw error;
}
if (hydrateDiagnostics) {
emitOptionalDiagnostics(options, {
...hydrateDiagnostics,
success: true,
integrityReasons: [],
totalMs: readPersistDeltaNow() - hydrateStartedAt,
});
}
return normalizedGraph;
}
@@ -3151,6 +3294,40 @@ export class BmeDatabase {
return snapshot;
}
async exportSnapshotProbe() {
const db = await this.open();
const metaRows = await db.transaction("r", db.table("meta"), async () =>
await db.table("meta").toArray(),
);
const metaMap = toMetaMap(metaRows);
const meta = {
...metaMap,
schemaVersion: BME_DB_SCHEMA_VERSION,
chatId: this.chatId,
revision: normalizeRevision(metaMap?.revision),
nodeCount: normalizeNonNegativeInteger(metaMap?.nodeCount, 0),
edgeCount: normalizeNonNegativeInteger(metaMap?.edgeCount, 0),
tombstoneCount: normalizeNonNegativeInteger(metaMap?.tombstoneCount, 0),
};
const state = {
lastProcessedFloor: Number.isFinite(Number(meta.lastProcessedFloor))
? Number(meta.lastProcessedFloor)
: META_DEFAULT_LAST_PROCESSED_FLOOR,
extractionCount: Number.isFinite(Number(meta.extractionCount))
? Number(meta.extractionCount)
: META_DEFAULT_EXTRACTION_COUNT,
};
return {
meta,
state,
nodes: [],
edges: [],
tombstones: [],
__stBmeProbeOnly: true,
__stBmeTombstonesOmitted: true,
};
}
async importSnapshot(snapshot, options = {}) {
const db = await this.open();
const normalizedSnapshot = sanitizeSnapshot(snapshot);