perf: add dirty persist and hydrate/layout optimizations

This commit is contained in:
Youzini-afk
2026-04-22 22:27:22 +08:00
parent 0b71a35a93
commit fb4dabeaf1
11 changed files with 1297 additions and 184 deletions

View File

@@ -23,6 +23,7 @@ const BATCH_JOURNAL_LIMIT = 96;
const MAINTENANCE_JOURNAL_LIMIT = 20;
export const BATCH_JOURNAL_VERSION = 2;
export const PROCESSED_MESSAGE_HASH_VERSION = 2;
const graphPersistDirtyStateByGraph = new WeakMap();
export const MANUAL_BACKUP_BATCH_JOURNAL_COVERAGE_KEY =
"manualBackupBatchJournalCoverage";
@@ -225,6 +226,301 @@ function getRequiredJournalCoverageStartFloor(graph, journals = []) {
return null;
}
function createGraphPersistDirtyState() {
return {
version: 0,
nodeUpserts: new Map(),
edgeUpserts: new Map(),
nodeDeletes: new Map(),
edgeDeletes: new Map(),
runtimeMetaDirty: false,
runtimeMetaVersion: 0,
fullSnapshotRequired: false,
fullSnapshotVersion: 0,
lastReason: "",
lastSource: "",
lastMutationAt: 0,
};
}
function getGraphPersistDirtyStateInternal(graph, create = false) {
if (!graph || typeof graph !== "object") {
return null;
}
let state = graphPersistDirtyStateByGraph.get(graph) || null;
if (!state && create) {
state = createGraphPersistDirtyState();
graphPersistDirtyStateByGraph.set(graph, state);
}
return state;
}
function bumpGraphPersistDirtyVersion(state, reason = "", source = "") {
if (!state || typeof state !== "object") return 0;
state.version = Math.max(0, Math.floor(Number(state.version || 0))) + 1;
state.lastReason = String(reason || "").trim();
state.lastSource = String(source || "").trim();
state.lastMutationAt = Date.now();
return state.version;
}
function buildRecordLookupById(records = []) {
const lookup = new Map();
for (const record of Array.isArray(records) ? records : []) {
if (!record || typeof record !== "object" || Array.isArray(record)) continue;
const id = String(record.id || "").trim();
if (!id || lookup.has(id)) continue;
lookup.set(id, record);
}
return lookup;
}
function normalizeDirtyRecordId(recordOrId, recordLookup = null) {
if (recordOrId && typeof recordOrId === "object" && !Array.isArray(recordOrId)) {
return String(recordOrId.id || "").trim();
}
const normalizedId = String(recordOrId || "").trim();
if (!normalizedId || !(recordLookup instanceof Map)) return normalizedId;
return recordLookup.has(normalizedId) ? normalizedId : "";
}
export function getGraphPersistDirtyStateSnapshot(graph) {
const state = getGraphPersistDirtyStateInternal(graph);
if (!state) return null;
return {
version: Math.max(0, Math.floor(Number(state.version || 0))),
nodeUpsertIds: Array.from(state.nodeUpserts.keys()),
edgeUpsertIds: Array.from(state.edgeUpserts.keys()),
deleteNodeIds: Array.from(state.nodeDeletes.keys()),
deleteEdgeIds: Array.from(state.edgeDeletes.keys()),
runtimeMetaDirty: state.runtimeMetaDirty === true,
runtimeMetaVersion: Math.max(0, Math.floor(Number(state.runtimeMetaVersion || 0))),
fullSnapshotRequired: state.fullSnapshotRequired === true,
fullSnapshotVersion: Math.max(0, Math.floor(Number(state.fullSnapshotVersion || 0))),
lastReason: String(state.lastReason || ""),
lastSource: String(state.lastSource || ""),
lastMutationAt: Math.max(0, Math.floor(Number(state.lastMutationAt || 0))),
};
}
export function hasGraphPersistDirtyState(graph) {
const snapshot = getGraphPersistDirtyStateSnapshot(graph);
if (!snapshot) return false;
return (
snapshot.nodeUpsertIds.length > 0 ||
snapshot.edgeUpsertIds.length > 0 ||
snapshot.deleteNodeIds.length > 0 ||
snapshot.deleteEdgeIds.length > 0 ||
snapshot.runtimeMetaDirty === true ||
snapshot.fullSnapshotRequired === true
);
}
export function cloneGraphPersistDirtyState(sourceGraph, targetGraph) {
const sourceState = getGraphPersistDirtyStateInternal(sourceGraph);
if (!sourceState || !targetGraph || typeof targetGraph !== "object") {
return targetGraph;
}
const targetState = createGraphPersistDirtyState();
const nodeById = buildRecordLookupById(targetGraph.nodes);
const edgeById = buildRecordLookupById(targetGraph.edges);
targetState.version = Math.max(0, Math.floor(Number(sourceState.version || 0)));
targetState.runtimeMetaDirty = sourceState.runtimeMetaDirty === true;
targetState.runtimeMetaVersion = Math.max(
0,
Math.floor(Number(sourceState.runtimeMetaVersion || 0)),
);
targetState.fullSnapshotRequired = sourceState.fullSnapshotRequired === true;
targetState.fullSnapshotVersion = Math.max(
0,
Math.floor(Number(sourceState.fullSnapshotVersion || 0)),
);
targetState.lastReason = String(sourceState.lastReason || "");
targetState.lastSource = String(sourceState.lastSource || "");
targetState.lastMutationAt = Math.max(
0,
Math.floor(Number(sourceState.lastMutationAt || 0)),
);
for (const [id, entry] of sourceState.nodeUpserts.entries()) {
const record = nodeById.get(id);
if (!record) continue;
targetState.nodeUpserts.set(id, {
version: Math.max(0, Math.floor(Number(entry?.version || 0))),
record,
});
}
for (const [id, entry] of sourceState.edgeUpserts.entries()) {
const record = edgeById.get(id);
if (!record) continue;
targetState.edgeUpserts.set(id, {
version: Math.max(0, Math.floor(Number(entry?.version || 0))),
record,
});
}
for (const [id, version] of sourceState.nodeDeletes.entries()) {
targetState.nodeDeletes.set(id, Math.max(0, Math.floor(Number(version || 0))));
}
for (const [id, version] of sourceState.edgeDeletes.entries()) {
targetState.edgeDeletes.set(id, Math.max(0, Math.floor(Number(version || 0))));
}
graphPersistDirtyStateByGraph.set(targetGraph, targetState);
return targetGraph;
}
export function pruneGraphPersistDirtyState(graph, committedVersion = 0) {
const state = getGraphPersistDirtyStateInternal(graph);
const normalizedCommittedVersion = Math.max(
0,
Math.floor(Number(committedVersion || 0)),
);
if (!state || normalizedCommittedVersion <= 0) {
return getGraphPersistDirtyStateSnapshot(graph);
}
for (const [id, entry] of state.nodeUpserts.entries()) {
if (Math.max(0, Math.floor(Number(entry?.version || 0))) <= normalizedCommittedVersion) {
state.nodeUpserts.delete(id);
}
}
for (const [id, entry] of state.edgeUpserts.entries()) {
if (Math.max(0, Math.floor(Number(entry?.version || 0))) <= normalizedCommittedVersion) {
state.edgeUpserts.delete(id);
}
}
for (const [id, version] of state.nodeDeletes.entries()) {
if (Math.max(0, Math.floor(Number(version || 0))) <= normalizedCommittedVersion) {
state.nodeDeletes.delete(id);
}
}
for (const [id, version] of state.edgeDeletes.entries()) {
if (Math.max(0, Math.floor(Number(version || 0))) <= normalizedCommittedVersion) {
state.edgeDeletes.delete(id);
}
}
if (state.runtimeMetaDirty && state.runtimeMetaVersion <= normalizedCommittedVersion) {
state.runtimeMetaDirty = false;
state.runtimeMetaVersion = 0;
}
if (
state.fullSnapshotRequired &&
state.fullSnapshotVersion <= normalizedCommittedVersion
) {
state.fullSnapshotRequired = false;
state.fullSnapshotVersion = 0;
}
if (!hasGraphPersistDirtyState(graph)) {
state.lastReason = "";
state.lastSource = "";
state.lastMutationAt = 0;
}
return getGraphPersistDirtyStateSnapshot(graph);
}
export function markGraphPersistNodeUpsert(
graph,
recordOrId,
reason = "",
source = "",
) {
normalizeGraphRuntimeState(graph, graph?.historyState?.chatId || "");
const state = getGraphPersistDirtyStateInternal(graph, true);
const recordLookup = buildRecordLookupById(graph?.nodes);
const normalizedId = normalizeDirtyRecordId(recordOrId, recordLookup);
const record =
recordOrId && typeof recordOrId === "object" && !Array.isArray(recordOrId)
? recordOrId
: recordLookup.get(normalizedId) || null;
if (!normalizedId || !record) return false;
const version = bumpGraphPersistDirtyVersion(state, reason, source);
state.nodeUpserts.set(normalizedId, { version, record });
state.nodeDeletes.delete(normalizedId);
return true;
}
export function markGraphPersistEdgeUpsert(
graph,
recordOrId,
reason = "",
source = "",
) {
normalizeGraphRuntimeState(graph, graph?.historyState?.chatId || "");
const state = getGraphPersistDirtyStateInternal(graph, true);
const recordLookup = buildRecordLookupById(graph?.edges);
const normalizedId = normalizeDirtyRecordId(recordOrId, recordLookup);
const record =
recordOrId && typeof recordOrId === "object" && !Array.isArray(recordOrId)
? recordOrId
: recordLookup.get(normalizedId) || null;
if (!normalizedId || !record) return false;
const version = bumpGraphPersistDirtyVersion(state, reason, source);
state.edgeUpserts.set(normalizedId, { version, record });
state.edgeDeletes.delete(normalizedId);
return true;
}
export function markGraphPersistNodeDelete(
graph,
nodeId,
reason = "",
source = "",
) {
normalizeGraphRuntimeState(graph, graph?.historyState?.chatId || "");
const normalizedId = String(nodeId || "").trim();
if (!normalizedId) return false;
const state = getGraphPersistDirtyStateInternal(graph, true);
const version = bumpGraphPersistDirtyVersion(state, reason, source);
state.nodeUpserts.delete(normalizedId);
state.nodeDeletes.set(normalizedId, version);
return true;
}
export function markGraphPersistEdgeDelete(
graph,
edgeId,
reason = "",
source = "",
) {
normalizeGraphRuntimeState(graph, graph?.historyState?.chatId || "");
const normalizedId = String(edgeId || "").trim();
if (!normalizedId) return false;
const state = getGraphPersistDirtyStateInternal(graph, true);
const version = bumpGraphPersistDirtyVersion(state, reason, source);
state.edgeUpserts.delete(normalizedId);
state.edgeDeletes.set(normalizedId, version);
return true;
}
export function markGraphPersistRuntimeMetaDirty(
graph,
reason = "",
source = "",
) {
normalizeGraphRuntimeState(graph, graph?.historyState?.chatId || "");
const state = getGraphPersistDirtyStateInternal(graph, true);
const version = bumpGraphPersistDirtyVersion(state, reason, source);
state.runtimeMetaDirty = true;
state.runtimeMetaVersion = version;
return true;
}
export function markGraphPersistFullSnapshotRequired(
graph,
reason = "",
source = "",
) {
normalizeGraphRuntimeState(graph, graph?.historyState?.chatId || "");
const state = getGraphPersistDirtyStateInternal(graph, true);
const version = bumpGraphPersistDirtyVersion(state, reason, source);
state.runtimeMetaDirty = true;
state.runtimeMetaVersion = version;
state.fullSnapshotRequired = true;
state.fullSnapshotVersion = version;
return true;
}
export function normalizeGraphRuntimeState(graph, chatId = "", options = {}) {
if (!graph || typeof graph !== "object") {
return graph;
@@ -600,6 +896,11 @@ export function applyProcessedHistorySnapshotToGraph(
: {};
historyState.processedMessageHashesNeedRefresh = false;
graph.lastProcessedSeq = safeLastProcessedAssistantFloor;
markGraphPersistRuntimeMetaDirty(
graph,
"processed-history-snapshot",
"runtime.history",
);
return graph;
}
@@ -655,6 +956,11 @@ export function rebindProcessedHistoryStateToChat(
: {};
historyState.processedMessageHashesNeedRefresh = false;
graph.lastProcessedSeq = safeLastProcessedAssistantFloor;
markGraphPersistRuntimeMetaDirty(
graph,
"history-state-rebound",
"runtime.history",
);
return {
rebound: true,
@@ -775,6 +1081,7 @@ export function markHistoryDirty(graph, floor, reason = "", source = "") {
reason: graph.historyState.lastMutationReason,
detectionSource: graph.historyState.lastMutationSource || "",
};
markGraphPersistRuntimeMetaDirty(graph, reason || "history-dirty", source || "runtime.history");
}
export function clearHistoryDirty(graph, result = null) {
@@ -794,6 +1101,7 @@ export function clearHistoryDirty(graph, result = null) {
if (result) {
graph.historyState.lastRecoveryResult = result;
}
markGraphPersistRuntimeMetaDirty(graph, "history-dirty-cleared", "runtime.history");
}
function buildNodeMap(nodes = []) {
@@ -1052,6 +1360,11 @@ export function appendBatchJournal(graph, entry) {
graph.historyState?.[MANUAL_BACKUP_BATCH_JOURNAL_COVERAGE_KEY],
graph.batchJournal,
);
markGraphPersistRuntimeMetaDirty(
graph,
"batch-journal-appended",
"runtime.batch-journal",
);
}
export function createMaintenanceJournalEntry(
@@ -1158,6 +1471,11 @@ export function appendMaintenanceJournal(graph, entry) {
-MAINTENANCE_JOURNAL_LIMIT,
);
}
markGraphPersistRuntimeMetaDirty(
graph,
"maintenance-journal-appended",
"runtime.maintenance",
);
}
export function getLatestMaintenanceJournalEntry(graph) {
@@ -1241,6 +1559,11 @@ export function applyMaintenanceInversePatch(graph, inversePatch = {}) {
}
sanitizeGraphReferences(graph);
markGraphPersistFullSnapshotRequired(
graph,
"maintenance-inverse-patch",
"runtime.maintenance",
);
return graph;
}
@@ -1266,6 +1589,7 @@ export function undoLatestMaintenance(graph) {
applyMaintenanceInversePatch(graph, entry.inversePatch || {});
graph.maintenanceJournal = graph.maintenanceJournal.slice(0, -1);
markGraphPersistRuntimeMetaDirty(graph, "maintenance-undo", "runtime.maintenance");
return {
ok: true,
@@ -1387,6 +1711,7 @@ export function rollbackBatch(graph, journal) {
applyJournalStateBefore(graph, journal.stateBefore || {});
sanitizeGraphReferences(graph);
markGraphPersistFullSnapshotRequired(graph, "rollback-batch", "runtime.batch-journal");
return graph;
}