perf: optimize persist-delta snapshot reuse and diagnostics

This commit is contained in:
Youzini-afk
2026-04-13 18:21:07 +08:00
parent d7460f1618
commit e4ab3ac41d
8 changed files with 965 additions and 121 deletions

View File

@@ -20,6 +20,13 @@ const DEFAULT_PERSIST_NATIVE_DELTA_THRESHOLD_STRUCTURAL_DELTA = 600;
const DEFAULT_PERSIST_NATIVE_DELTA_THRESHOLD_SERIALIZED_CHARS = 4000000;
const DEFAULT_PERSIST_NATIVE_DELTA_BRIDGE_MODE = "json";
const SUPPORTED_PERSIST_NATIVE_DELTA_BRIDGE_MODES = new Set(["json", "hash"]);
const PERSIST_RECORD_SERIALIZATION_CACHE_LIMIT = 50000;
const persistRecordSerializationCacheByObject = new WeakMap();
const persistRecordSerializationCacheByToken = new Map();
let persistRecordSerializationCacheEpoch = 1;
const persistPreparedRecordSetCacheByArray = new WeakMap();
let persistPreparedRecordSetCacheEpoch = 1;
export const BME_RUNTIME_HISTORY_META_KEY = "runtimeHistoryState";
export const BME_RUNTIME_VECTOR_META_KEY = "runtimeVectorIndexState";
@@ -361,7 +368,7 @@ function normalizeNodeUpdatedAt(node = {}, fallbackNowMs = Date.now()) {
function normalizeEdgeUpdatedAt(edge = {}, fallbackNowMs = Date.now()) {
return normalizeTimestamp(
edge.updatedAt ?? edge.validAt ?? edge.createdTime,
edge.updatedAt ?? edge.invalidAt ?? edge.expiredAt ?? edge.validAt ?? edge.createdTime,
fallbackNowMs,
);
}
@@ -410,27 +417,187 @@ function deriveEdgeSourceFloor(edge = {}, nodeSourceFloorById = new Map()) {
return null;
}
function clonePersistGraphInputRecord(record = null) {
if (!record || typeof record !== "object" || Array.isArray(record)) {
return null;
}
return {
...record,
};
}
function buildPersistSnapshotGraphInput(graph = null, chatId = "") {
const sourceGraph =
graph && typeof graph === "object" && !Array.isArray(graph)
? graph
: createEmptyGraph();
const graphInput = {
...sourceGraph,
historyState:
sourceGraph.historyState &&
typeof sourceGraph.historyState === "object" &&
!Array.isArray(sourceGraph.historyState)
? { ...sourceGraph.historyState }
: {},
vectorIndexState:
sourceGraph.vectorIndexState &&
typeof sourceGraph.vectorIndexState === "object" &&
!Array.isArray(sourceGraph.vectorIndexState)
? { ...sourceGraph.vectorIndexState }
: {},
nodes: toArray(sourceGraph.nodes)
.map((node) => clonePersistGraphInputRecord(node))
.filter(Boolean),
edges: toArray(sourceGraph.edges)
.map((edge) => clonePersistGraphInputRecord(edge))
.filter(Boolean),
batchJournal: Array.isArray(sourceGraph.batchJournal)
? [...sourceGraph.batchJournal]
: sourceGraph.batchJournal,
maintenanceJournal: Array.isArray(sourceGraph.maintenanceJournal)
? [...sourceGraph.maintenanceJournal]
: sourceGraph.maintenanceJournal,
};
if (chatId) {
graphInput.historyState.chatId = chatId;
}
return graphInput;
}
function buildPersistSnapshotRecordByIdMap(records = []) {
const map = new Map();
for (const record of toArray(records)) {
if (!record || typeof record !== "object" || Array.isArray(record)) continue;
const id = normalizeRecordId(record.id);
if (!id || map.has(id)) continue;
map.set(id, record);
}
return map;
}
function clonePersistSnapshotRecord(record = null) {
if (!record || typeof record !== "object" || Array.isArray(record)) {
return null;
}
try {
return JSON.parse(JSON.stringify(record));
} catch {
if (typeof globalThis.structuredClone === "function") {
try {
return globalThis.structuredClone(record);
} catch {
// no-op
}
}
return null;
}
}
function normalizeComparablePersistNumber(value) {
const parsed = Number(value);
return Number.isFinite(parsed) ? parsed : null;
}
function hasReusablePersistNodeRecord(baseRecord, runtimeRecord, normalized = {}) {
if (!baseRecord || !runtimeRecord) return false;
const normalizedType = normalizeRecordId(normalized.type ?? runtimeRecord.type);
if (normalizeRecordId(baseRecord.type) !== normalizedType) return false;
if (Boolean(baseRecord.archived) !== Boolean(runtimeRecord.archived)) return false;
if (
normalizeComparablePersistNumber(baseRecord.updatedAt) !==
normalizeComparablePersistNumber(normalized.updatedAt)
) {
return false;
}
if (
normalizeComparablePersistNumber(baseRecord.seq) !==
normalizeComparablePersistNumber(runtimeRecord.seq)
) {
return false;
}
if (normalizeRecordId(baseRecord.parentId) !== normalizeRecordId(runtimeRecord.parentId)) {
return false;
}
if (normalizeRecordId(baseRecord.prevId) !== normalizeRecordId(runtimeRecord.prevId)) {
return false;
}
if (normalizeRecordId(baseRecord.nextId) !== normalizeRecordId(runtimeRecord.nextId)) {
return false;
}
return true;
}
function hasReusablePersistEdgeRecord(baseRecord, runtimeRecord, normalized = {}) {
if (!baseRecord || !runtimeRecord) return false;
if (normalizeRecordId(baseRecord.fromId) !== normalizeRecordId(normalized.fromId)) {
return false;
}
if (normalizeRecordId(baseRecord.toId) !== normalizeRecordId(normalized.toId)) {
return false;
}
if (normalizeRecordId(baseRecord.relation) !== normalizeRecordId(runtimeRecord.relation)) {
return false;
}
if (
normalizeComparablePersistNumber(baseRecord.updatedAt) !==
normalizeComparablePersistNumber(normalized.updatedAt)
) {
return false;
}
if (
normalizeComparablePersistNumber(baseRecord.invalidAt) !==
normalizeComparablePersistNumber(runtimeRecord.invalidAt)
) {
return false;
}
if (
normalizeComparablePersistNumber(baseRecord.expiredAt) !==
normalizeComparablePersistNumber(runtimeRecord.expiredAt)
) {
return false;
}
return true;
}
function hasReusablePersistTombstoneRecord(baseRecord, normalized = {}) {
if (!baseRecord) return false;
if (normalizeRecordId(baseRecord.kind) !== normalizeRecordId(normalized.kind)) {
return false;
}
if (normalizeRecordId(baseRecord.targetId) !== normalizeRecordId(normalized.targetId)) {
return false;
}
if (
normalizeRecordId(baseRecord.sourceDeviceId) !==
normalizeRecordId(normalized.sourceDeviceId)
) {
return false;
}
if (
normalizeComparablePersistNumber(baseRecord.deletedAt) !==
normalizeComparablePersistNumber(normalized.deletedAt)
) {
return false;
}
return true;
}
export function buildSnapshotFromGraph(graph, options = {}) {
const baseSnapshot = sanitizeSnapshot(options.baseSnapshot || {});
const baseSnapshotInput =
options?.baseSnapshot &&
typeof options.baseSnapshot === "object" &&
!Array.isArray(options.baseSnapshot)
? options.baseSnapshot
: {};
const baseSnapshot = sanitizeSnapshot(baseSnapshotInput);
const baseSnapshotView = normalizePersistSnapshotView(baseSnapshotInput);
const nowMs = normalizeTimestamp(options.nowMs, Date.now());
const chatId =
normalizeChatId(options.chatId) ||
normalizeChatId(graph?.historyState?.chatId) ||
normalizeChatId(baseSnapshot.meta?.chatId);
const graphInput = toPlainData(graph, createEmptyGraph());
if (!graphInput.historyState || typeof graphInput.historyState !== "object") {
graphInput.historyState = {};
}
if (
!graphInput.vectorIndexState ||
typeof graphInput.vectorIndexState !== "object"
) {
graphInput.vectorIndexState = {};
}
if (chatId) {
graphInput.historyState.chatId = chatId;
}
const graphInput = buildPersistSnapshotGraphInput(graph, chatId);
const legacyActiveOwnerKey = String(
graphInput?.knowledgeState?.activeOwnerKey || "",
).trim();
@@ -444,49 +611,102 @@ export function buildSnapshotFromGraph(graph, options = {}) {
chatId || graphInput.historyState.chatId || "",
);
const runtimeGraph = normalizeGraphRuntimeState(graphInput, chatId);
const baseNodeById = buildPersistSnapshotRecordByIdMap(baseSnapshotView.nodes);
const baseEdgeById = buildPersistSnapshotRecordByIdMap(baseSnapshotView.edges);
const baseTombstoneById = buildPersistSnapshotRecordByIdMap(
baseSnapshotView.tombstones,
);
const nodes = toArray(runtimeGraph?.nodes)
.map((node) => {
if (!node || typeof node !== "object" || Array.isArray(node)) return null;
if (!node || typeof node !== "object" || Array.isArray(node)) {
return null;
}
const id = normalizeRecordId(node.id);
if (!id) return null;
return {
...node,
id,
updatedAt: normalizeNodeUpdatedAt(node, nowMs),
};
const normalizedUpdatedAt = normalizeNodeUpdatedAt(node, nowMs);
const baseNode = baseNodeById.get(id);
if (
hasReusablePersistNodeRecord(baseNode, node, {
type: node.type,
updatedAt: normalizedUpdatedAt,
})
) {
return baseNode;
}
const plainNode = clonePersistSnapshotRecord(node);
if (!plainNode || typeof plainNode !== "object" || Array.isArray(plainNode)) {
return null;
}
plainNode.id = id;
plainNode.updatedAt = normalizedUpdatedAt;
return plainNode;
})
.filter(Boolean);
const edges = toArray(runtimeGraph?.edges)
.map((edge) => {
if (!edge || typeof edge !== "object" || Array.isArray(edge)) return null;
if (!edge || typeof edge !== "object" || Array.isArray(edge)) {
return null;
}
const id = normalizeRecordId(edge.id);
if (!id) return null;
return {
...edge,
id,
fromId: normalizeRecordId(edge.fromId),
toId: normalizeRecordId(edge.toId),
updatedAt: normalizeEdgeUpdatedAt(edge, nowMs),
};
const normalizedFromId = normalizeRecordId(edge.fromId);
const normalizedToId = normalizeRecordId(edge.toId);
const normalizedUpdatedAt = normalizeEdgeUpdatedAt(edge, nowMs);
const baseEdge = baseEdgeById.get(id);
if (
hasReusablePersistEdgeRecord(baseEdge, edge, {
fromId: normalizedFromId,
toId: normalizedToId,
updatedAt: normalizedUpdatedAt,
})
) {
return baseEdge;
}
const plainEdge = clonePersistSnapshotRecord(edge);
if (!plainEdge || typeof plainEdge !== "object" || Array.isArray(plainEdge)) {
return null;
}
plainEdge.id = id;
plainEdge.fromId = normalizedFromId;
plainEdge.toId = normalizedToId;
plainEdge.updatedAt = normalizedUpdatedAt;
return plainEdge;
})
.filter(Boolean);
const tombstones = toArray(options.tombstones ?? baseSnapshot.tombstones)
const tombstones = toArray(options.tombstones ?? baseSnapshotView.tombstones)
.map((record) => {
if (!record || typeof record !== "object" || Array.isArray(record))
return null;
const id = normalizeRecordId(record.id);
if (!id) return null;
return {
...record,
id,
kind: normalizeRecordId(record.kind),
targetId: normalizeRecordId(record.targetId),
sourceDeviceId: normalizeRecordId(record.sourceDeviceId),
deletedAt: normalizeTimestamp(record.deletedAt, nowMs),
};
const normalizedKind = normalizeRecordId(record.kind);
const normalizedTargetId = normalizeRecordId(record.targetId);
const normalizedSourceDeviceId = normalizeRecordId(record.sourceDeviceId);
const normalizedDeletedAt = normalizeTimestamp(record.deletedAt, nowMs);
const baseTombstone = baseTombstoneById.get(id);
if (
hasReusablePersistTombstoneRecord(baseTombstone, {
kind: normalizedKind,
targetId: normalizedTargetId,
sourceDeviceId: normalizedSourceDeviceId,
deletedAt: normalizedDeletedAt,
})
) {
return baseTombstone;
}
const plainRecord = clonePersistSnapshotRecord(record);
if (!plainRecord || typeof plainRecord !== "object" || Array.isArray(plainRecord)) {
return null;
}
plainRecord.id = id;
plainRecord.kind = normalizedKind;
plainRecord.targetId = normalizedTargetId;
plainRecord.sourceDeviceId = normalizedSourceDeviceId;
plainRecord.deletedAt = normalizedDeletedAt;
return plainRecord;
})
.filter(Boolean);
@@ -635,6 +855,145 @@ function hashPersistSerializedRecord32(value = "") {
return hash >>> 0;
}
function resolvePersistRecordSerializationVersion(record = {}) {
const candidates = [
record?.updatedAt,
record?.deletedAt,
record?.invalidAt,
record?.expiredAt,
record?.validAt,
record?.lastModified,
record?.createdTime,
record?.lastAccessTime,
];
for (const value of candidates) {
const parsed = Number(value);
if (Number.isFinite(parsed)) return Math.floor(parsed);
}
return null;
}
function resolvePersistRecordSerializationCacheToken(record = {}) {
const id = normalizeRecordId(record?.id);
const version = resolvePersistRecordSerializationVersion(record);
if (!id || version == null) return "";
return [
id,
version,
normalizeRecordId(record?.kind),
normalizeRecordId(record?.targetId),
normalizeRecordId(record?.fromId),
normalizeRecordId(record?.toId),
normalizeRecordId(record?.type),
normalizeRecordId(record?.relation),
record?.archived === true ? "1" : "0",
].join("|");
}
function recordPersistSerializationCacheStat(stats = null, key = "") {
if (!stats || typeof stats !== "object" || !key) return;
stats[key] = Number(stats[key] || 0) + 1;
}
function recordPersistPreparedRecordSetCacheStat(stats = null, key = "") {
if (!stats || typeof stats !== "object" || !key) return;
stats[key] = Number(stats[key] || 0) + 1;
}
function resolvePreparedRecordSetCacheKey(options = {}) {
return [
options?.includeSerializedList === true ? "s1" : "s0",
options?.includeHashList === true ? "h1" : "h0",
options?.includeSerializedLookup !== false ? "l1" : "l0",
options?.includeSerializedCharCount === true ? "c1" : "c0",
options?.includeTargetKeys === true ? "t1" : "t0",
].join("|");
}
function touchPersistRecordSerializationTokenCache(token, entry) {
if (!token || !entry) return;
if (persistRecordSerializationCacheByToken.has(token)) {
persistRecordSerializationCacheByToken.delete(token);
}
persistRecordSerializationCacheByToken.set(token, entry);
while (
persistRecordSerializationCacheByToken.size >
PERSIST_RECORD_SERIALIZATION_CACHE_LIMIT
) {
const oldestKey = persistRecordSerializationCacheByToken.keys().next().value;
if (!oldestKey) break;
persistRecordSerializationCacheByToken.delete(oldestKey);
}
}
function ensurePersistRecordSerializationHash(entry = null) {
if (!entry || typeof entry !== "object") return 0;
if (!Number.isFinite(Number(entry.hash))) {
entry.hash = hashPersistSerializedRecord32(String(entry.json || ""));
}
return Number(entry.hash) >>> 0;
}
function getPersistRecordSerialization(
record,
{ includeHash = false, cacheStats = null } = {},
) {
if (!record || typeof record !== "object" || Array.isArray(record)) {
const emptyEntry = { token: "", json: "null", length: 4, hash: 1996966820 };
if (includeHash) emptyEntry.hash = hashPersistSerializedRecord32(emptyEntry.json);
return emptyEntry;
}
const token = resolvePersistRecordSerializationCacheToken(record);
const cachedByObject = token
? persistRecordSerializationCacheByObject.get(record)
: null;
if (
cachedByObject &&
cachedByObject.token === token &&
cachedByObject.epoch === persistRecordSerializationCacheEpoch
) {
recordPersistSerializationCacheStat(cacheStats, "objectHitCount");
if (includeHash) ensurePersistRecordSerializationHash(cachedByObject);
return cachedByObject;
}
const cachedByToken = token ? persistRecordSerializationCacheByToken.get(token) : null;
if (cachedByToken) {
persistRecordSerializationCacheByObject.set(record, cachedByToken);
touchPersistRecordSerializationTokenCache(token, cachedByToken);
recordPersistSerializationCacheStat(cacheStats, "tokenHitCount");
if (includeHash) ensurePersistRecordSerializationHash(cachedByToken);
return cachedByToken;
}
const json = JSON.stringify(record);
const entry = {
epoch: persistRecordSerializationCacheEpoch,
token,
json,
length: json.length,
hash: includeHash ? hashPersistSerializedRecord32(json) : null,
};
if (token) {
persistRecordSerializationCacheByObject.set(record, entry);
touchPersistRecordSerializationTokenCache(token, entry);
}
recordPersistSerializationCacheStat(cacheStats, "missCount");
return entry;
}
function sumPersistSerializationCacheHits(stats = null) {
if (!stats || typeof stats !== "object") return 0;
return Number(stats.objectHitCount || 0) + Number(stats.tokenHitCount || 0);
}
export function resetPersistRecordSerializationCaches() {
persistRecordSerializationCacheEpoch += 1;
persistRecordSerializationCacheByToken.clear();
persistPreparedRecordSetCacheEpoch += 1;
}
function buildPreparedRecordSet(
records = [],
{
@@ -644,38 +1003,71 @@ function buildPreparedRecordSet(
includeHashList = false,
includeSerializedLookup = true,
includeSerializedCharCount = false,
serializationCacheStats = null,
preparedRecordSetCacheStats = null,
usePreparedRecordSetCache = true,
} = {},
) {
const sourceRecords = toArray(records);
const cacheKey =
usePreparedRecordSetCache !== false &&
Array.isArray(records) &&
sourceRecords === records
? resolvePreparedRecordSetCacheKey({
includeSerializedList,
includeHashList,
includeSerializedLookup,
includeSerializedCharCount,
includeTargetKeys,
})
: "";
if (cacheKey) {
const cachedEntry = persistPreparedRecordSetCacheByArray.get(records);
const cachedRecordSet =
cachedEntry &&
cachedEntry.epoch === persistPreparedRecordSetCacheEpoch &&
cachedEntry.values instanceof Map
? cachedEntry.values.get(cacheKey)
: null;
if (cachedRecordSet) {
recordPersistPreparedRecordSetCacheStat(preparedRecordSetCacheStats, "hitCount");
return cachedRecordSet;
}
recordPersistPreparedRecordSetCacheStat(preparedRecordSetCacheStats, "missCount");
}
const ids = [];
const serialized = includeSerializedList ? [] : null;
const hashes = includeHashList ? [] : null;
const serializedById = includeSerializedLookup ? new Map() : null;
const recordById = retainRecords ? new Map() : null;
const targetKeyById = includeTargetKeys ? new Map() : null;
const recordById = null;
const targetKeyById = null;
const targetKeys = includeTargetKeys ? [] : null;
let serializedCharCount = 0;
for (const record of sourceRecords) {
if (!record || typeof record !== "object" || Array.isArray(record)) continue;
const id = normalizeRecordId(record.id);
if (!id) continue;
const json = JSON.stringify(record);
const serializedEntry = getPersistRecordSerialization(record, {
includeHash: includeHashList,
cacheStats: serializationCacheStats,
});
const json = serializedEntry.json;
ids.push(id);
if (serialized) serialized.push(json);
if (hashes) hashes.push(hashPersistSerializedRecord32(json));
if (hashes) hashes.push(ensurePersistRecordSerializationHash(serializedEntry));
if (serializedById) serializedById.set(id, json);
if (includeSerializedCharCount) {
serializedCharCount += json.length;
serializedCharCount += serializedEntry.length;
}
if (recordById) recordById.set(id, record);
if (targetKeyById) {
if (targetKeys) {
const kind = normalizeRecordId(record.kind);
const targetId = normalizeRecordId(record.targetId);
targetKeyById.set(id, kind && targetId ? `${kind}:${targetId}` : "");
targetKeys.push(kind && targetId ? `${kind}:${targetId}` : "");
}
}
return {
const preparedRecordSet = {
ids,
serialized,
hashes,
@@ -683,11 +1075,27 @@ function buildPreparedRecordSet(
sourceRecords,
recordById,
targetKeyById,
targetKeys,
serializedCharCount,
};
if (cacheKey) {
const cachedEntry = persistPreparedRecordSetCacheByArray.get(records);
const values =
cachedEntry &&
cachedEntry.epoch === persistPreparedRecordSetCacheEpoch &&
cachedEntry.values instanceof Map
? cachedEntry.values
: new Map();
values.set(cacheKey, preparedRecordSet);
persistPreparedRecordSetCacheByArray.set(records, {
epoch: persistPreparedRecordSetCacheEpoch,
values,
});
}
return preparedRecordSet;
}
function ensurePreparedSerializedLookup(recordSet = null) {
function ensurePreparedSerializedLookup(recordSet = null, cacheStats = null) {
if (!recordSet || typeof recordSet !== "object") {
return new Map();
}
@@ -700,12 +1108,67 @@ function ensurePreparedSerializedLookup(recordSet = null) {
if (!record || typeof record !== "object" || Array.isArray(record)) continue;
const id = normalizeRecordId(record.id);
if (!id) continue;
map.set(id, JSON.stringify(record));
map.set(
id,
getPersistRecordSerialization(record, {
cacheStats,
}).json,
);
}
recordSet.serializedById = map;
return map;
}
function ensurePreparedRecordLookup(recordSet = null) {
if (!recordSet || typeof recordSet !== "object") {
return new Map();
}
if (recordSet.recordById instanceof Map) {
return recordSet.recordById;
}
const map = new Map();
for (const record of toArray(recordSet.sourceRecords)) {
if (!record || typeof record !== "object" || Array.isArray(record)) continue;
const id = normalizeRecordId(record.id);
if (!id) continue;
map.set(id, record);
}
recordSet.recordById = map;
return map;
}
function ensurePreparedTargetKeyLookup(recordSet = null) {
if (!recordSet || typeof recordSet !== "object") {
return new Map();
}
if (recordSet.targetKeyById instanceof Map) {
return recordSet.targetKeyById;
}
const map = new Map();
if (
Array.isArray(recordSet.ids) &&
Array.isArray(recordSet.targetKeys) &&
recordSet.ids.length === recordSet.targetKeys.length
) {
for (let index = 0; index < recordSet.ids.length; index++) {
map.set(recordSet.ids[index], String(recordSet.targetKeys[index] || ""));
}
} else {
for (const record of toArray(recordSet.sourceRecords)) {
if (!record || typeof record !== "object" || Array.isArray(record)) continue;
const id = normalizeRecordId(record.id);
if (!id) continue;
const kind = normalizeRecordId(record.kind);
const targetId = normalizeRecordId(record.targetId);
map.set(id, kind && targetId ? `${kind}:${targetId}` : "");
}
}
recordSet.targetKeyById = map;
return map;
}
function buildPreparedPersistDeltaContext(
beforeSnapshot,
afterSnapshot,
@@ -725,11 +1188,27 @@ function buildPreparedPersistDeltaContext(
const includeCompactHashList = compactPayloadMode === "hash";
const includeSerializedLookup = options.includeSerializedLookup !== false;
const includeSerializedCharCount = options.includeSerializedCharCount === true;
const serializationCacheStats =
options?.serializationCacheStats &&
typeof options.serializationCacheStats === "object" &&
!Array.isArray(options.serializationCacheStats)
? options.serializationCacheStats
: null;
const preparedRecordSetCacheStats =
options?.preparedRecordSetCacheStats &&
typeof options.preparedRecordSetCacheStats === "object" &&
!Array.isArray(options.preparedRecordSetCacheStats)
? options.preparedRecordSetCacheStats
: null;
const usePreparedRecordSetCache = options?.usePreparedRecordSetCache !== false;
const beforeNodes = buildPreparedRecordSet(beforeSnapshot.nodes, {
includeSerializedList: includeCompactSerializedList,
includeHashList: includeCompactHashList,
includeSerializedLookup,
includeSerializedCharCount,
serializationCacheStats,
preparedRecordSetCacheStats,
usePreparedRecordSetCache,
});
const afterNodes = buildPreparedRecordSet(afterSnapshot.nodes, {
retainRecords: true,
@@ -737,12 +1216,18 @@ function buildPreparedPersistDeltaContext(
includeHashList: includeCompactHashList,
includeSerializedLookup,
includeSerializedCharCount,
serializationCacheStats,
preparedRecordSetCacheStats,
usePreparedRecordSetCache,
});
const beforeEdges = buildPreparedRecordSet(beforeSnapshot.edges, {
includeSerializedList: includeCompactSerializedList,
includeHashList: includeCompactHashList,
includeSerializedLookup,
includeSerializedCharCount,
serializationCacheStats,
preparedRecordSetCacheStats,
usePreparedRecordSetCache,
});
const afterEdges = buildPreparedRecordSet(afterSnapshot.edges, {
retainRecords: true,
@@ -750,12 +1235,18 @@ function buildPreparedPersistDeltaContext(
includeHashList: includeCompactHashList,
includeSerializedLookup,
includeSerializedCharCount,
serializationCacheStats,
preparedRecordSetCacheStats,
usePreparedRecordSetCache,
});
const beforeTombstones = buildPreparedRecordSet(beforeSnapshot.tombstones, {
includeSerializedList: includeCompactSerializedList,
includeHashList: includeCompactHashList,
includeSerializedLookup,
includeSerializedCharCount,
serializationCacheStats,
preparedRecordSetCacheStats,
usePreparedRecordSetCache,
});
const afterTombstones = buildPreparedRecordSet(afterSnapshot.tombstones, {
retainRecords: true,
@@ -764,6 +1255,9 @@ function buildPreparedPersistDeltaContext(
includeHashList: includeCompactHashList,
includeSerializedLookup,
includeSerializedCharCount,
serializationCacheStats,
preparedRecordSetCacheStats,
usePreparedRecordSetCache,
});
const sourceDeviceId = normalizeRecordId(
afterSnapshot.meta?.deviceId || beforeSnapshot.meta?.deviceId || "",
@@ -803,6 +1297,7 @@ function buildPreparedPersistDeltaContext(
Math.abs(afterTombstones.ids.length - beforeTombstones.ids.length),
beforeSerializedChars,
afterSerializedChars,
serializationCacheStats,
compactPayload:
compactPayloadMode === "json"
? {
@@ -995,10 +1490,19 @@ function buildPersistDeltaFromIdShape(preparedContext, delta = null) {
const normalized = normalizePersistDeltaIdShape(delta);
if (!normalized) return null;
const afterNodeRecordById = ensurePreparedRecordLookup(preparedContext.afterNodes);
const afterEdgeRecordById = ensurePreparedRecordLookup(preparedContext.afterEdges);
const afterTombstoneRecordById = ensurePreparedRecordLookup(
preparedContext.afterTombstones,
);
const afterTombstoneTargetKeyById = ensurePreparedTargetKeyLookup(
preparedContext.afterTombstones,
);
const tombstoneMap = new Map();
for (const id of normalized.upsertTombstoneIds) {
const record = preparedContext.afterTombstones.recordById?.get(id);
const targetKey = preparedContext.afterTombstones.targetKeyById?.get(id) || "";
const record = afterTombstoneRecordById.get(id);
const targetKey = afterTombstoneTargetKeyById.get(id) || "";
if (!record || !targetKey) continue;
tombstoneMap.set(targetKey, record);
}
@@ -1024,11 +1528,11 @@ function buildPersistDeltaFromIdShape(preparedContext, delta = null) {
return {
upsertNodes: hydratePreparedRecords(
preparedContext.afterNodes.recordById,
afterNodeRecordById,
normalized.upsertNodeIds,
),
upsertEdges: hydratePreparedRecords(
preparedContext.afterEdges.recordById,
afterEdgeRecordById,
normalized.upsertEdgeIds,
),
deleteNodeIds: normalized.deleteNodeIds,
@@ -1103,6 +1607,26 @@ function tryBuildNativePersistDelta(
export function buildPersistDelta(beforeSnapshot, afterSnapshot, options = {}) {
const shouldCollectDiagnostics = typeof options?.onDiagnostics === "function";
const startedAt = shouldCollectDiagnostics ? readPersistDeltaNow() : 0;
const timings = shouldCollectDiagnostics
? {
prepareMs: 0,
nativeAttemptMs: 0,
lookupMs: 0,
jsDiffMs: 0,
hydrateMs: 0,
}
: null;
const serializationCacheStats = {
objectHitCount: 0,
tokenHitCount: 0,
missCount: 0,
};
const preparedRecordSetCacheStats = shouldCollectDiagnostics
? {
hitCount: 0,
missCount: 0,
}
: null;
const normalizedBefore = normalizePersistSnapshotView(beforeSnapshot);
const normalizedAfter = normalizePersistSnapshotView(afterSnapshot);
const nowMs = normalizeTimestamp(options.nowMs, Date.now());
@@ -1115,6 +1639,7 @@ export function buildPersistDelta(beforeSnapshot, afterSnapshot, options = {}) {
shouldCollectDiagnostics ||
(options?.useNativeDelta === true &&
(nativeGateOptions?.minCombinedSerializedChars || 0) > 0);
const prepareStartedAt = shouldCollectDiagnostics ? readPersistDeltaNow() : 0;
const preparedContext = buildPreparedPersistDeltaContext(
normalizedBefore,
normalizedAfter,
@@ -1123,8 +1648,14 @@ export function buildPersistDelta(beforeSnapshot, afterSnapshot, options = {}) {
compactPayloadMode: options?.useNativeDelta === true ? nativeBridgeMode : "none",
includeSerializedLookup: options?.useNativeDelta !== true,
includeSerializedCharCount: shouldMeasureSerializedChars,
serializationCacheStats,
preparedRecordSetCacheStats,
usePreparedRecordSetCache: options?.usePreparedRecordSetCache !== false,
},
);
if (timings) {
timings.prepareMs = readPersistDeltaNow() - prepareStartedAt;
}
const combinedSerializedChars =
preparedContext.beforeSerializedChars + preparedContext.afterSerializedChars;
const preparedNativeGate =
@@ -1137,6 +1668,7 @@ export function buildPersistDelta(beforeSnapshot, afterSnapshot, options = {}) {
})
: null;
const nativeAttemptStartedAt = shouldCollectDiagnostics ? readPersistDeltaNow() : 0;
const nativeAttempt =
options?.useNativeDelta !== true
? {
@@ -1156,11 +1688,18 @@ export function buildPersistDelta(beforeSnapshot, afterSnapshot, options = {}) {
preparedContext,
options,
);
if (timings) {
timings.nativeAttemptMs = readPersistDeltaNow() - nativeAttemptStartedAt;
}
const nativeRawDelta = nativeAttempt.rawDelta;
const nativeIdDelta = normalizePersistDeltaIdShape(nativeRawDelta);
const hydrateStartedAt = shouldCollectDiagnostics ? readPersistDeltaNow() : 0;
const nativeDelta = nativeIdDelta
? buildPersistDeltaFromIdShape(preparedContext, nativeIdDelta)
: normalizePersistDeltaShape(nativeRawDelta);
if (timings && nativeRawDelta) {
timings.hydrateMs = readPersistDeltaNow() - hydrateStartedAt;
}
if (nativeRawDelta && !nativeDelta) {
if (options?.nativeFailOpen === false) {
throw new Error("native-persist-delta-invalid-result");
@@ -1201,6 +1740,19 @@ export function buildPersistDelta(beforeSnapshot, afterSnapshot, options = {}) {
structuralDelta: preparedContext.structuralDelta,
beforeSerializedChars: preparedContext.beforeSerializedChars,
afterSerializedChars: preparedContext.afterSerializedChars,
prepareMs: timings?.prepareMs || 0,
nativeAttemptMs: timings?.nativeAttemptMs || 0,
lookupMs: timings?.lookupMs || 0,
jsDiffMs: timings?.jsDiffMs || 0,
hydrateMs: timings?.hydrateMs || 0,
serializationCacheObjectHits: Number(serializationCacheStats.objectHitCount || 0),
serializationCacheTokenHits: Number(serializationCacheStats.tokenHitCount || 0),
serializationCacheMisses: Number(serializationCacheStats.missCount || 0),
serializationCacheHits: sumPersistSerializationCacheHits(
serializationCacheStats,
),
preparedRecordSetCacheHits: Number(preparedRecordSetCacheStats?.hitCount || 0),
preparedRecordSetCacheMisses: Number(preparedRecordSetCacheStats?.missCount || 0),
minCombinedSerializedChars:
preparedNativeGate?.minCombinedSerializedChars || 0,
buildMs: readPersistDeltaNow() - startedAt,
@@ -1214,31 +1766,50 @@ export function buildPersistDelta(beforeSnapshot, afterSnapshot, options = {}) {
return result;
}
const lookupStartedAt = shouldCollectDiagnostics ? readPersistDeltaNow() : 0;
const beforeNodeSerializedById = ensurePreparedSerializedLookup(
preparedContext.beforeNodes,
serializationCacheStats,
);
const afterNodeSerializedById = ensurePreparedSerializedLookup(
preparedContext.afterNodes,
serializationCacheStats,
);
const beforeEdgeSerializedById = ensurePreparedSerializedLookup(
preparedContext.beforeEdges,
serializationCacheStats,
);
const afterEdgeSerializedById = ensurePreparedSerializedLookup(
preparedContext.afterEdges,
serializationCacheStats,
);
const beforeTombstoneSerializedById = ensurePreparedSerializedLookup(
preparedContext.beforeTombstones,
serializationCacheStats,
);
const afterTombstoneSerializedById = ensurePreparedSerializedLookup(
preparedContext.afterTombstones,
serializationCacheStats,
);
const afterNodeRecordById = ensurePreparedRecordLookup(preparedContext.afterNodes);
const afterEdgeRecordById = ensurePreparedRecordLookup(preparedContext.afterEdges);
const afterTombstoneRecordById = ensurePreparedRecordLookup(
preparedContext.afterTombstones,
);
const afterTombstoneTargetKeyById = ensurePreparedTargetKeyLookup(
preparedContext.afterTombstones,
);
if (timings) {
timings.lookupMs = readPersistDeltaNow() - lookupStartedAt;
}
const jsDiffStartedAt = shouldCollectDiagnostics ? readPersistDeltaNow() : 0;
const upsertNodes = [];
for (const id of preparedContext.afterNodes.ids) {
if (
beforeNodeSerializedById.get(id) !== afterNodeSerializedById.get(id)
) {
const record = preparedContext.afterNodes.recordById?.get(id);
const record = afterNodeRecordById.get(id);
if (record) upsertNodes.push(record);
}
}
@@ -1248,7 +1819,7 @@ export function buildPersistDelta(beforeSnapshot, afterSnapshot, options = {}) {
if (
beforeEdgeSerializedById.get(id) !== afterEdgeSerializedById.get(id)
) {
const record = preparedContext.afterEdges.recordById?.get(id);
const record = afterEdgeRecordById.get(id);
if (record) upsertEdges.push(record);
}
}
@@ -1273,8 +1844,8 @@ export function buildPersistDelta(beforeSnapshot, afterSnapshot, options = {}) {
beforeTombstoneSerializedById.get(id) !==
afterTombstoneSerializedById.get(id)
) {
const record = preparedContext.afterTombstones.recordById?.get(id);
const targetKey = preparedContext.afterTombstones.targetKeyById?.get(id) || "";
const record = afterTombstoneRecordById.get(id);
const targetKey = afterTombstoneTargetKeyById.get(id) || "";
if (!record || !targetKey) continue;
tombstoneMap.set(targetKey, record);
}
@@ -1314,6 +1885,9 @@ export function buildPersistDelta(beforeSnapshot, afterSnapshot, options = {}) {
: {}),
},
};
if (timings) {
timings.jsDiffMs = readPersistDeltaNow() - jsDiffStartedAt;
}
if (shouldCollectDiagnostics) {
emitPersistDeltaDiagnostics(options, {
requestedNative: options?.useNativeDelta === true,
@@ -1332,6 +1906,19 @@ export function buildPersistDelta(beforeSnapshot, afterSnapshot, options = {}) {
structuralDelta: preparedContext.structuralDelta,
beforeSerializedChars: preparedContext.beforeSerializedChars,
afterSerializedChars: preparedContext.afterSerializedChars,
prepareMs: timings?.prepareMs || 0,
nativeAttemptMs: timings?.nativeAttemptMs || 0,
lookupMs: timings?.lookupMs || 0,
jsDiffMs: timings?.jsDiffMs || 0,
hydrateMs: timings?.hydrateMs || 0,
serializationCacheObjectHits: Number(serializationCacheStats.objectHitCount || 0),
serializationCacheTokenHits: Number(serializationCacheStats.tokenHitCount || 0),
serializationCacheMisses: Number(serializationCacheStats.missCount || 0),
serializationCacheHits: sumPersistSerializationCacheHits(
serializationCacheStats,
),
preparedRecordSetCacheHits: Number(preparedRecordSetCacheStats?.hitCount || 0),
preparedRecordSetCacheMisses: Number(preparedRecordSetCacheStats?.missCount || 0),
minCombinedSerializedChars:
preparedNativeGate?.minCombinedSerializedChars || 0,
buildMs: readPersistDeltaNow() - startedAt,