refactor: stabilize persistence delta commit flow

This commit is contained in:
Youzini-afk
2026-04-11 13:57:03 +08:00
parent ea94ec0cad
commit 1834bc1d24
10 changed files with 1182 additions and 589 deletions

View File

@@ -127,6 +127,17 @@ function normalizeMode(mode = "replace") {
return String(mode || "").toLowerCase() === "merge" ? "merge" : "replace";
}
const BME_PERSIST_META_RESERVED_KEYS = new Set([
"revision",
"lastModified",
"nodeCount",
"edgeCount",
"tombstoneCount",
"syncDirty",
"syncDirtyReason",
"lastMutationReason",
]);
function sanitizeSnapshot(snapshot = {}) {
if (!snapshot || typeof snapshot !== "object" || Array.isArray(snapshot)) {
return {
@@ -430,6 +441,153 @@ export function buildSnapshotFromGraph(graph, options = {}) {
};
}
function buildSnapshotRecordIndex(records = []) {
const map = new Map();
for (const record of toArray(records)) {
const id = normalizeRecordId(record?.id);
if (!id) continue;
map.set(id, JSON.stringify(record));
}
return map;
}
function buildSnapshotRecordArrayIndex(records = []) {
const map = new Map();
for (const record of toArray(records)) {
const id = normalizeRecordId(record?.id);
if (!id) continue;
map.set(id, toPlainData(record, record));
}
return map;
}
function buildRuntimeMetaPatch(snapshot = {}) {
const normalizedSnapshot = sanitizeSnapshot(snapshot);
const patch = {};
for (const [rawKey, value] of Object.entries(normalizedSnapshot.meta || {})) {
const key = normalizeRecordId(rawKey);
if (!key || BME_PERSIST_META_RESERVED_KEYS.has(key)) continue;
patch[key] = toPlainData(value, value);
}
const state = normalizeStateSnapshot(normalizedSnapshot);
patch.lastProcessedFloor = state.lastProcessedFloor;
patch.extractionCount = state.extractionCount;
patch.schemaVersion = BME_DB_SCHEMA_VERSION;
patch.chatId = normalizeChatId(
normalizedSnapshot.meta?.chatId || patch.chatId || "",
);
return patch;
}
function ensureDeleteTombstone(
tombstoneMap,
kind,
targetId,
deletedAt,
sourceDeviceId = "",
) {
const normalizedKind = normalizeRecordId(kind);
const normalizedTargetId = normalizeRecordId(targetId);
if (!normalizedKind || !normalizedTargetId) return;
const targetKey = `${normalizedKind}:${normalizedTargetId}`;
if (tombstoneMap.has(targetKey)) return;
tombstoneMap.set(targetKey, {
id: `${normalizedKind}:${normalizedTargetId}`,
kind: normalizedKind,
targetId: normalizedTargetId,
sourceDeviceId: normalizeRecordId(sourceDeviceId),
deletedAt: normalizeTimestamp(deletedAt),
});
}
export function buildPersistDelta(beforeSnapshot, afterSnapshot, options = {}) {
const normalizedBefore = sanitizeSnapshot(beforeSnapshot);
const normalizedAfter = sanitizeSnapshot(afterSnapshot);
const nowMs = normalizeTimestamp(options.nowMs, Date.now());
const beforeNodeJsonById = buildSnapshotRecordIndex(normalizedBefore.nodes);
const afterNodeJsonById = buildSnapshotRecordIndex(normalizedAfter.nodes);
const beforeEdgeJsonById = buildSnapshotRecordIndex(normalizedBefore.edges);
const afterEdgeJsonById = buildSnapshotRecordIndex(normalizedAfter.edges);
const beforeTombstoneJsonById = buildSnapshotRecordIndex(
normalizedBefore.tombstones,
);
const afterNodeById = buildSnapshotRecordArrayIndex(normalizedAfter.nodes);
const afterEdgeById = buildSnapshotRecordArrayIndex(normalizedAfter.edges);
const afterTombstoneById = buildSnapshotRecordArrayIndex(
normalizedAfter.tombstones,
);
const upsertNodes = [];
for (const [id, record] of afterNodeById.entries()) {
if (beforeNodeJsonById.get(id) !== JSON.stringify(record)) {
upsertNodes.push(record);
}
}
const upsertEdges = [];
for (const [id, record] of afterEdgeById.entries()) {
if (beforeEdgeJsonById.get(id) !== JSON.stringify(record)) {
upsertEdges.push(record);
}
}
const deleteNodeIds = [];
for (const id of beforeNodeJsonById.keys()) {
if (!afterNodeJsonById.has(id)) {
deleteNodeIds.push(id);
}
}
const deleteEdgeIds = [];
for (const id of beforeEdgeJsonById.keys()) {
if (!afterEdgeJsonById.has(id)) {
deleteEdgeIds.push(id);
}
}
const tombstoneMap = new Map();
for (const [id, record] of afterTombstoneById.entries()) {
if (beforeTombstoneJsonById.get(id) !== JSON.stringify(record)) {
tombstoneMap.set(`${record.kind}:${record.targetId}`, record);
}
}
for (const nodeId of deleteNodeIds) {
ensureDeleteTombstone(
tombstoneMap,
"node",
nodeId,
nowMs,
normalizedAfter.meta?.deviceId || normalizedBefore.meta?.deviceId || "",
);
}
for (const edgeId of deleteEdgeIds) {
ensureDeleteTombstone(
tombstoneMap,
"edge",
edgeId,
nowMs,
normalizedAfter.meta?.deviceId || normalizedBefore.meta?.deviceId || "",
);
}
return {
upsertNodes,
upsertEdges,
deleteNodeIds,
deleteEdgeIds,
tombstones: Array.from(tombstoneMap.values()),
runtimeMetaPatch: {
...buildRuntimeMetaPatch(normalizedAfter),
...(options.runtimeMetaPatch &&
typeof options.runtimeMetaPatch === "object" &&
!Array.isArray(options.runtimeMetaPatch)
? toPlainData(options.runtimeMetaPatch, {})
: {}),
},
};
}
export function buildGraphFromSnapshot(snapshot, options = {}) {
const normalizedSnapshot = sanitizeSnapshot(snapshot);
const chatId =
@@ -915,6 +1073,105 @@ export class BmeDatabase {
return true;
}
async commitDelta(delta = {}, options = {}) {
const db = await this.open();
const nowMs = Date.now();
const normalizedDelta =
delta && typeof delta === "object" && !Array.isArray(delta) ? delta : {};
const upsertNodes = this._normalizeNodeRecords(normalizedDelta.upsertNodes, nowMs);
const upsertEdges = this._normalizeEdgeRecords(normalizedDelta.upsertEdges, nowMs);
const tombstones = this._normalizeTombstoneRecords(
normalizedDelta.tombstones,
nowMs,
);
const deleteNodeIds = toArray(normalizedDelta.deleteNodeIds)
.map((value) => normalizeRecordId(value))
.filter(Boolean);
const deleteEdgeIds = toArray(normalizedDelta.deleteEdgeIds)
.map((value) => normalizeRecordId(value))
.filter(Boolean);
const runtimeMetaPatch =
normalizedDelta.runtimeMetaPatch &&
typeof normalizedDelta.runtimeMetaPatch === "object" &&
!Array.isArray(normalizedDelta.runtimeMetaPatch)
? normalizedDelta.runtimeMetaPatch
: {};
const reason = String(options.reason || "commitDelta");
const requestedRevision = normalizeRevision(options.requestedRevision);
const shouldMarkSyncDirty = options.markSyncDirty !== false;
let nextRevision = 0;
let counts = {
nodes: 0,
edges: 0,
tombstones: 0,
};
await db.transaction(
"rw",
db.table("nodes"),
db.table("edges"),
db.table("tombstones"),
db.table("meta"),
async () => {
if (deleteEdgeIds.length) {
await db.table("edges").bulkDelete(deleteEdgeIds);
}
if (deleteNodeIds.length) {
await db.table("nodes").bulkDelete(deleteNodeIds);
}
if (upsertNodes.length) {
await db.table("nodes").bulkPut(upsertNodes);
}
if (upsertEdges.length) {
await db.table("edges").bulkPut(upsertEdges);
}
if (tombstones.length) {
await db.table("tombstones").bulkPut(tombstones);
}
for (const [rawKey, value] of Object.entries(runtimeMetaPatch)) {
const key = normalizeRecordId(rawKey);
if (!key || BME_PERSIST_META_RESERVED_KEYS.has(key)) continue;
await this._setMetaInTx(db, key, value, nowMs);
}
counts = await this._updateCountMetaInTx(db, nowMs);
const currentRevision = normalizeRevision(
(await db.table("meta").get("revision"))?.value,
);
nextRevision = Math.max(currentRevision + 1, requestedRevision);
await this._setMetaInTx(db, "revision", nextRevision, nowMs);
await this._setMetaInTx(db, "lastModified", nowMs, nowMs);
await this._setMetaInTx(db, "lastMutationReason", reason, nowMs);
await this._setMetaInTx(db, "syncDirty", shouldMarkSyncDirty, nowMs);
await this._setMetaInTx(
db,
"syncDirtyReason",
shouldMarkSyncDirty ? reason : "",
nowMs,
);
},
);
return {
revision: nextRevision,
lastModified: nowMs,
imported: {
nodes: counts.nodes,
edges: counts.edges,
tombstones: counts.tombstones,
},
delta: {
upsertNodes: upsertNodes.length,
upsertEdges: upsertEdges.length,
deleteNodeIds: deleteNodeIds.length,
deleteEdgeIds: deleteEdgeIds.length,
tombstones: tombstones.length,
},
};
}
async bulkUpsertNodes(nodes = []) {
const records = this._normalizeNodeRecords(nodes);
if (!records.length) {