From c12956f674f6f9dd739ce58cbe1614e43bb0742d Mon Sep 17 00:00:00 2001 From: Youzini-afk <13153778771cx@gmail.com> Date: Tue, 14 Apr 2026 19:34:31 +0800 Subject: [PATCH] feat: promote opfs v2 as default local store --- index.js | 476 ++++++-- maintenance/extraction-controller.js | 7 + runtime/settings-defaults.js | 2 +- sync/bme-db.js | 97 +- sync/bme-opfs-store.js | 1610 +++++++++++++++++++++++++- sync/bme-sync.js | 246 +++- tests/indexeddb-sync.mjs | 23 +- tests/opfs-persistence.mjs | 51 +- ui/panel.js | 21 + ui/ui-status.js | 7 + 10 files changed, 2397 insertions(+), 143 deletions(-) diff --git a/index.js b/index.js index a10aacf..5d3dd58 100644 --- a/index.js +++ b/index.js @@ -27,7 +27,9 @@ import { ensureDexieLoaded, } from "./sync/bme-db.js"; import { + BME_GRAPH_LOCAL_STORAGE_MODE_AUTO, BME_GRAPH_LOCAL_STORAGE_MODE_INDEXEDDB, + BME_GRAPH_LOCAL_STORAGE_MODE_OPFS_PRIMARY, BME_GRAPH_LOCAL_STORAGE_MODE_OPFS_SHADOW, OpfsGraphStore, detectOpfsSupport, @@ -1227,10 +1229,25 @@ function getGraphPersistenceLiveState() { updatedAt: graphPersistenceState.updatedAt, storagePrimary: graphPersistenceState.storagePrimary || "indexeddb", storageMode: graphPersistenceState.storageMode || "indexeddb", + resolvedLocalStore: String( + graphPersistenceState.resolvedLocalStore || + buildGraphLocalStoreSelectorKey(getPreferredGraphLocalStorePresentationSync()), + ), + localStoreFormatVersion: Number(graphPersistenceState.localStoreFormatVersion || 0) || 1, + localStoreMigrationState: String( + graphPersistenceState.localStoreMigrationState || "idle", + ), opfsWriteLockState: cloneRuntimeDebugValue( graphPersistenceState.opfsWriteLockState, null, ), + opfsWalDepth: Number(graphPersistenceState.opfsWalDepth || 0), + opfsPendingBytes: Number(graphPersistenceState.opfsPendingBytes || 0), + opfsCompactionState: cloneRuntimeDebugValue( + graphPersistenceState.opfsCompactionState, + null, + ), + remoteSyncFormatVersion: Number(graphPersistenceState.remoteSyncFormatVersion || 0) || 1, dbReady: graphPersistenceState.dbReady ?? isGraphLoadStateDbReady(graphPersistenceState.loadState), @@ -3880,14 +3897,18 @@ function buildIndexedDbStorePresentation() { } function buildOpfsStorePresentation( - mode = BME_GRAPH_LOCAL_STORAGE_MODE_OPFS_SHADOW, + mode = BME_GRAPH_LOCAL_STORAGE_MODE_OPFS_PRIMARY, ) { + const normalizedMode = normalizeGraphLocalStorageMode( + mode, + BME_GRAPH_LOCAL_STORAGE_MODE_OPFS_PRIMARY, + ); return { storagePrimary: "opfs", - storageMode: normalizeGraphLocalStorageMode( - mode, - BME_GRAPH_LOCAL_STORAGE_MODE_OPFS_SHADOW, - ), + storageMode: + normalizedMode === BME_GRAPH_LOCAL_STORAGE_MODE_OPFS_SHADOW + ? BME_GRAPH_LOCAL_STORAGE_MODE_OPFS_PRIMARY + : normalizedMode, statusLabel: "OPFS", reasonPrefix: "opfs", }; @@ -3900,7 +3921,7 @@ function getRequestedGraphLocalStorageMode(settings = getSettings()) { : {}; return normalizeGraphLocalStorageMode( sourceSettings.graphLocalStorageMode, - BME_GRAPH_LOCAL_STORAGE_MODE_INDEXEDDB, + "auto", ); } @@ -3911,6 +3932,34 @@ function resolveDbGraphStorePresentation(db = null) { return buildIndexedDbStorePresentation(); } +function readLocalStoreDiagnosticsSync( + db = null, + presentation = buildIndexedDbStorePresentation(), +) { + const resolvedPresentation = + presentation && typeof presentation === "object" + ? presentation + : resolveDbGraphStorePresentation(db); + const rawDiagnostics = + typeof db?.getStorageDiagnosticsSync === "function" + ? db.getStorageDiagnosticsSync() + : null; + return { + resolvedLocalStore: buildGraphLocalStoreSelectorKey(resolvedPresentation), + localStoreFormatVersion: + Number(rawDiagnostics?.formatVersion || 0) || + (resolvedPresentation.storagePrimary === "opfs" ? 2 : 1), + localStoreMigrationState: + String(rawDiagnostics?.migrationState || "").trim() || "idle", + opfsWalDepth: Number(rawDiagnostics?.walCount || 0), + opfsPendingBytes: Number(rawDiagnostics?.walTotalBytes || 0), + opfsCompactionState: cloneRuntimeDebugValue( + rawDiagnostics?.compactionState || null, + null, + ), + }; +} + function resolveSnapshotGraphStorePresentation( snapshot = null, fallbackPresentation = buildIndexedDbStorePresentation(), @@ -3945,10 +3994,10 @@ function buildGraphLocalStoreSelectorKey( ? "opfs" : "indexeddb"; const storageMode = - storagePrimary === "opfs" + storagePrimary === "opfs" ? normalizeGraphLocalStorageMode( normalizedPresentation.storageMode, - BME_GRAPH_LOCAL_STORAGE_MODE_OPFS_SHADOW, + BME_GRAPH_LOCAL_STORAGE_MODE_OPFS_PRIMARY, ) : BME_GRAPH_LOCAL_STORAGE_MODE_INDEXEDDB; return `${storagePrimary}:${storageMode}`; @@ -4003,6 +4052,12 @@ async function getGraphLocalStoreCapability(forceRefresh = false) { function getPreferredGraphLocalStorePresentationSync(settings = getSettings()) { const requestedMode = getRequestedGraphLocalStorageMode(settings); + if ( + requestedMode === "auto" && + bmeLocalStoreCapabilitySnapshot?.opfsAvailable + ) { + return buildOpfsStorePresentation(BME_GRAPH_LOCAL_STORAGE_MODE_OPFS_PRIMARY); + } if ( isGraphLocalStorageModeOpfs(requestedMode) && bmeLocalStoreCapabilitySnapshot?.opfsAvailable @@ -4016,6 +4071,12 @@ function getPreferredGraphLocalStorePresentationSync(settings = getSettings()) { settings = getSettings(), ) { const requestedMode = getRequestedGraphLocalStorageMode(settings); + if (requestedMode === "auto") { + const capability = await getGraphLocalStoreCapability(); + return capability.opfsAvailable + ? buildOpfsStorePresentation(BME_GRAPH_LOCAL_STORAGE_MODE_OPFS_PRIMARY) + : buildIndexedDbStorePresentation(); + } if (!isGraphLocalStorageModeOpfs(requestedMode)) { return buildIndexedDbStorePresentation(); } @@ -4843,6 +4904,18 @@ async function syncIndexedDbMetaToPersistenceState( } const storePresentation = resolveDbGraphStorePresentation(db); + const localStoreDiagnostics = + typeof readLocalStoreDiagnosticsSync === "function" + ? readLocalStoreDiagnosticsSync(db, storePresentation) + : { + resolvedLocalStore: `${storePresentation?.storagePrimary || "indexeddb"}:${storePresentation?.storageMode || "indexeddb"}`, + localStoreFormatVersion: + storePresentation.storagePrimary === "opfs" ? 2 : 1, + localStoreMigrationState: "idle", + opfsWalDepth: 0, + opfsPendingBytes: 0, + opfsCompactionState: null, + }; const persistenceEnvironment = buildPersistenceEnvironment( getContext(), storePresentation, @@ -4858,6 +4931,7 @@ async function syncIndexedDbMetaToPersistenceState( lastBackupRestoredAt, lastBackupRollbackAt, lastBackupFilename, + remoteSyncFormatVersion, ] = await Promise.all([ typeof db.getRevision === "function" ? db.getRevision() : 0, typeof db.getMeta === "function" ? db.getMeta("syncDirty", false) : false, @@ -4879,6 +4953,7 @@ async function syncIndexedDbMetaToPersistenceState( ? db.getMeta("lastBackupRollbackAt", 0) : 0, typeof db.getMeta === "function" ? db.getMeta("lastBackupFilename", "") : "", + typeof db.getMeta === "function" ? db.getMeta("remoteSyncFormatVersion", 1) : 1, ]); const patch = { @@ -4887,6 +4962,12 @@ async function syncIndexedDbMetaToPersistenceState( cacheStorageTier: persistenceEnvironment.cacheStorageTier, storagePrimary: storePresentation.storagePrimary, storageMode: storePresentation.storageMode, + resolvedLocalStore: localStoreDiagnostics.resolvedLocalStore, + localStoreFormatVersion: localStoreDiagnostics.localStoreFormatVersion, + localStoreMigrationState: localStoreDiagnostics.localStoreMigrationState, + opfsWalDepth: localStoreDiagnostics.opfsWalDepth, + opfsPendingBytes: localStoreDiagnostics.opfsPendingBytes, + opfsCompactionState: localStoreDiagnostics.opfsCompactionState, indexedDbRevision: normalizeIndexedDbRevision(revision), syncState: normalizeGraphSyncState(syncState), syncDirty: Boolean(syncDirty), @@ -4898,6 +4979,10 @@ async function syncIndexedDbMetaToPersistenceState( lastBackupRestoredAt: Number(lastBackupRestoredAt) || 0, lastBackupRollbackAt: Number(lastBackupRollbackAt) || 0, lastBackupFilename: String(lastBackupFilename || ""), + remoteSyncFormatVersion: + Number(remoteSyncFormatVersion || 0) || + Number(graphPersistenceState.remoteSyncFormatVersion || 0) || + 1, lastSyncError: String(lastSyncError || ""), opfsWriteLockState: typeof db.getWriteLockSnapshot === "function" @@ -6730,6 +6815,10 @@ async function loadGraphFromIndexedDb( updateGraphPersistenceState({ storagePrimary: recoveredStore.storagePrimary, storageMode: recoveredStore.storageMode, + resolvedLocalStore: buildGraphLocalStoreSelectorKey(recoveredStore), + localStoreFormatVersion: + recoveredStore.storagePrimary === "opfs" ? 2 : 1, + localStoreMigrationState: "completed", indexedDbRevision: recoveredRevision, indexedDbLastError: "", lastSyncError: "", @@ -6790,6 +6879,10 @@ async function loadGraphFromIndexedDb( updateGraphPersistenceState({ storagePrimary: migratedStore.storagePrimary, storageMode: migratedStore.storageMode, + resolvedLocalStore: buildGraphLocalStoreSelectorKey(migratedStore), + localStoreFormatVersion: + migratedStore.storagePrimary === "opfs" ? 2 : 1, + localStoreMigrationState: "completed", indexedDbRevision: migratedRevision, indexedDbLastError: "", lastSyncError: "", @@ -6809,6 +6902,7 @@ async function loadGraphFromIndexedDb( indexedDbLastError: String( migrationResult?.error || "migration-failed", ), + localStoreMigrationState: "failed", dualWriteLastResult: { action: "migration", source: "chat_metadata", @@ -7855,6 +7949,7 @@ async function persistGraphToConfiguredDurableTier( revision, reason, lastProcessedAssistantFloor = null, + persistDelta = null, } = {}, ) { const preferredLocalStore = getPreferredGraphLocalStorePresentationSync(); @@ -7924,6 +8019,7 @@ async function persistGraphToConfiguredDurableTier( reason: `${reason}:local-cache-mirror`, persistRole: "cache-mirror", scheduleCloudUpload: false, + persistDelta, }); } return buildGraphPersistResult({ @@ -7944,6 +8040,7 @@ async function persistGraphToConfiguredDurableTier( const indexedDbResult = await saveGraphToIndexedDb(chatId, graph, { revision, reason, + persistDelta, }); if (indexedDbResult?.saved) { persistGraphCommitMarker(context, { @@ -8021,6 +8118,7 @@ async function persistGraphToConfiguredDurableTier( queueGraphPersistToIndexedDb(chatId, graph, { revision: acceptedRevision, reason: `${reason}:chat-state-fallback:promote-indexeddb`, + persistDelta, }); return buildGraphPersistResult({ saved: true, @@ -8630,6 +8728,7 @@ async function persistExtractionBatchResult({ reason = "extraction-batch-complete", lastProcessedAssistantFloor = null, graphSnapshot = null, + persistDelta = null, } = {}) { ensureCurrentGraphRuntimeState(); const context = getContext(); @@ -8667,6 +8766,7 @@ async function persistExtractionBatchResult({ revision, reason, lastProcessedAssistantFloor, + persistDelta, }, ); if (acceptedPersistResult?.accepted) { @@ -10392,14 +10492,15 @@ async function saveGraphToIndexedDb( reason = "graph-save", persistRole = "primary", scheduleCloudUpload: scheduleCloudUploadOption = undefined, + persistDelta = null, } = {}, ) { const normalizedChatId = normalizeChatIdCandidate(chatId); - if (!normalizedChatId || !graph) { + if (!normalizedChatId || (!graph && !persistDelta)) { return { saved: false, chatId: normalizedChatId, - reason: "indexeddb-missing-chat-or-graph", + reason: "indexeddb-missing-chat-graph-or-delta", revision: normalizeIndexedDbRevision(revision), }; } @@ -10420,70 +10521,107 @@ async function saveGraphToIndexedDb( db = await manager.getCurrentDb(normalizedChatId); localStore = resolveDbGraphStorePresentation(db); const persistenceEnvironment = buildPersistenceEnvironment(context, localStore); - const currentIdentity = resolveCurrentChatIdentity(context); - const baseSnapshot = - readCachedIndexedDbSnapshot(normalizedChatId, localStore) || - (await db.exportSnapshot()); - const requestedRevision = resolvePersistRevisionFloor(revision, graph); - const snapshot = buildSnapshotFromGraph(graph, { - chatId: normalizedChatId, - revision: requestedRevision, - baseSnapshot, - lastModified: Date.now(), - meta: { - storagePrimary: localStore.storagePrimary, - storageMode: localStore.storageMode, - lastMutationReason: String(reason || "graph-save"), - integrity: - currentIdentity.integrity || graphPersistenceState.metadataIntegrity, - hostChatId: currentIdentity.hostChatId || "", - }, - }); - const currentSettings = getSettings(); const localStoreTier = resolveLocalStoreTierFromPresentation(localStore); + const currentIdentity = resolveCurrentChatIdentity(context); + const requestedRevision = resolvePersistRevisionFloor(revision, graph); + const currentSettings = getSettings(); const shouldScheduleCloudUpload = scheduleCloudUploadOption != null ? scheduleCloudUploadOption === true : persistenceEnvironment.hostProfile !== "luker" && persistRole !== "cache-mirror"; + const directPersistDelta = + persistDelta && + typeof persistDelta === "object" && + !Array.isArray(persistDelta) + ? cloneRuntimeDebugValue(persistDelta, persistDelta) + : null; + let baseSnapshot = null; + let snapshot = null; + let delta = directPersistDelta; + let persistDeltaBuildDiagnostics = null; + let nativePersistModuleStatus = null; + let nativePersistPreloadStatus = "not-requested"; + let nativePersistPreloadError = ""; + let nativePersistPreloadMs = 0; + const persistDeltaStartedAt = readPersistDeltaDiagnosticsNow(); + + if (!delta) { + baseSnapshot = + readCachedIndexedDbSnapshot(normalizedChatId, localStore) || + (await db.exportSnapshot()); + snapshot = buildSnapshotFromGraph(graph, { + chatId: normalizedChatId, + revision: requestedRevision, + baseSnapshot, + lastModified: Date.now(), + meta: { + storagePrimary: localStore.storagePrimary, + storageMode: localStore.storageMode, + lastMutationReason: String(reason || "graph-save"), + integrity: + currentIdentity.integrity || graphPersistenceState.metadataIntegrity, + hostChatId: currentIdentity.hostChatId || "", + }, + }); + } const nativePersistBridgeMode = String( currentSettings.persistNativeDeltaBridgeMode || "json", ); - const nativePersistRequested = currentSettings.persistUseNativeDelta === true; + const nativePersistRequested = + !directPersistDelta && currentSettings.persistUseNativeDelta === true; const nativePersistForceDisabled = currentSettings.graphNativeForceDisable === true; - const nativePersistGate = evaluatePersistNativeDeltaGate( - baseSnapshot, - snapshot, - currentSettings, - ); + const nativePersistGate = + baseSnapshot && snapshot + ? evaluatePersistNativeDeltaGate(baseSnapshot, snapshot, currentSettings) + : { + allowed: false, + reasons: ["direct-delta"], + minSnapshotRecords: Number( + currentSettings.persistNativeDeltaThresholdRecords || 0, + ), + minStructuralDelta: Number( + currentSettings.persistNativeDeltaThresholdStructuralDelta || 0, + ), + minCombinedSerializedChars: Number( + currentSettings.persistNativeDeltaThresholdSerializedChars || 0, + ), + beforeRecordCount: 0, + afterRecordCount: 0, + maxSnapshotRecords: 0, + structuralDelta: 0, + }; const shouldUseNativePersistDelta = nativePersistRequested && nativePersistForceDisabled !== true && nativePersistGate.allowed; - const persistDeltaStartedAt = readPersistDeltaDiagnosticsNow(); - let persistDeltaBuildDiagnostics = null; - let nativePersistModuleStatus = null; - let nativePersistPreloadStatus = nativePersistRequested - ? nativePersistForceDisabled - ? "force-disabled" - : nativePersistGate.allowed - ? "pending" - : "gated-out" - : "not-requested"; - let nativePersistPreloadError = ""; - let nativePersistPreloadMs = 0; + if (!directPersistDelta) { + nativePersistPreloadStatus = nativePersistRequested + ? nativePersistForceDisabled + ? "force-disabled" + : nativePersistGate.allowed + ? "pending" + : "gated-out" + : "not-requested"; + } updatePersistDeltaDiagnostics({ chatId: normalizedChatId, saveReason: String(reason || "graph-save"), requestedRevision, requestedNative: nativePersistRequested, - requestedBridgeMode: nativePersistBridgeMode, + requestedBridgeMode: directPersistDelta ? "direct-delta" : nativePersistBridgeMode, nativeForceDisabled: nativePersistForceDisabled, nativeFailOpen: currentSettings.nativeEngineFailOpen !== false, - gateAllowed: nativePersistGate.allowed, - gateReasons: cloneRuntimeDebugValue(nativePersistGate.reasons, []), - preloadGateAllowed: nativePersistGate.allowed, - preloadGateReasons: cloneRuntimeDebugValue(nativePersistGate.reasons, []), + gateAllowed: directPersistDelta ? true : nativePersistGate.allowed, + gateReasons: cloneRuntimeDebugValue( + directPersistDelta ? ["direct-delta"] : nativePersistGate.reasons, + [], + ), + preloadGateAllowed: directPersistDelta ? true : nativePersistGate.allowed, + preloadGateReasons: cloneRuntimeDebugValue( + directPersistDelta ? ["direct-delta"] : nativePersistGate.reasons, + [], + ), minSnapshotRecords: nativePersistGate.minSnapshotRecords, minStructuralDelta: nativePersistGate.minStructuralDelta, minCombinedSerializedChars: nativePersistGate.minCombinedSerializedChars, @@ -10495,8 +10633,9 @@ async function saveGraphToIndexedDb( preloadMs: 0, preloadError: "", status: "building", + path: directPersistDelta ? "direct-delta" : undefined, }); - if (shouldUseNativePersistDelta) { + if (!directPersistDelta && shouldUseNativePersistDelta) { const preloadStartedAt = readPersistDeltaDiagnosticsNow(); try { if (!nativePersistDeltaInstallPromise) { @@ -10528,20 +10667,70 @@ async function saveGraphToIndexedDb( } } } - const delta = buildPersistDelta(baseSnapshot, snapshot, { - useNativeDelta: shouldUseNativePersistDelta, - nativeFailOpen: currentSettings.nativeEngineFailOpen !== false, - persistNativeDeltaThresholdRecords: - currentSettings.persistNativeDeltaThresholdRecords, - persistNativeDeltaThresholdStructuralDelta: - currentSettings.persistNativeDeltaThresholdStructuralDelta, - persistNativeDeltaThresholdSerializedChars: - currentSettings.persistNativeDeltaThresholdSerializedChars, - persistNativeDeltaBridgeMode: nativePersistBridgeMode, - onDiagnostics(snapshot) { - persistDeltaBuildDiagnostics = snapshot; - }, - }); + if (!delta) { + delta = buildPersistDelta(baseSnapshot, snapshot, { + useNativeDelta: shouldUseNativePersistDelta, + nativeFailOpen: currentSettings.nativeEngineFailOpen !== false, + persistNativeDeltaThresholdRecords: + currentSettings.persistNativeDeltaThresholdRecords, + persistNativeDeltaThresholdStructuralDelta: + currentSettings.persistNativeDeltaThresholdStructuralDelta, + persistNativeDeltaThresholdSerializedChars: + currentSettings.persistNativeDeltaThresholdSerializedChars, + persistNativeDeltaBridgeMode: nativePersistBridgeMode, + onDiagnostics(snapshotValue) { + persistDeltaBuildDiagnostics = snapshotValue; + }, + }); + } else { + persistDeltaBuildDiagnostics = { + requestedNative: false, + requestedBridgeMode: "direct-delta", + usedNative: false, + path: "direct-delta", + gateAllowed: true, + gateReasons: ["direct-delta"], + nativeAttemptStatus: "not-requested", + nativeError: "", + beforeRecordCount: Number( + delta?.countDelta?.previous?.nodes || 0, + ) + Number(delta?.countDelta?.previous?.edges || 0), + afterRecordCount: Number( + delta?.countDelta?.next?.nodes || 0, + ) + Number(delta?.countDelta?.next?.edges || 0), + maxSnapshotRecords: Math.max( + Number(delta?.countDelta?.previous?.nodes || 0) + + Number(delta?.countDelta?.previous?.edges || 0), + Number(delta?.countDelta?.next?.nodes || 0) + + Number(delta?.countDelta?.next?.edges || 0), + ), + structuralDelta: + Number(delta?.upsertNodes?.length || 0) + + Number(delta?.upsertEdges?.length || 0) + + Number(delta?.deleteNodeIds?.length || 0) + + Number(delta?.deleteEdgeIds?.length || 0), + beforeSerializedChars: 0, + afterSerializedChars: 0, + combinedSerializedChars: 0, + prepareMs: 0, + nativeAttemptMs: 0, + lookupMs: 0, + jsDiffMs: 0, + hydrateMs: 0, + serializationCacheObjectHits: 0, + serializationCacheTokenHits: 0, + serializationCacheMisses: 0, + serializationCacheHits: 0, + preparedRecordSetCacheHits: 0, + preparedRecordSetCacheMisses: 0, + minCombinedSerializedChars: 0, + upsertNodeCount: Number(delta?.upsertNodes?.length || 0), + upsertEdgeCount: Number(delta?.upsertEdges?.length || 0), + deleteNodeCount: Number(delta?.deleteNodeIds?.length || 0), + deleteEdgeCount: Number(delta?.deleteEdgeIds?.length || 0), + tombstoneCount: Number(delta?.tombstones?.length || 0), + }; + } const commitResult = await db.commitDelta(delta, { reason, requestedRevision, @@ -10549,19 +10738,35 @@ async function saveGraphToIndexedDb( }); let scheduleUploadWarning = ""; - snapshot.meta.revision = normalizeIndexedDbRevision( - commitResult?.revision, - requestedRevision, - ); - snapshot.meta.lastModified = Number(commitResult?.lastModified || Date.now()); - snapshot.meta.lastMutationReason = String(reason || "graph-save"); - snapshot.meta.storagePrimary = localStore.storagePrimary; - snapshot.meta.storageMode = localStore.storageMode; - cacheIndexedDbSnapshot(normalizedChatId, snapshot); + if (graph) { + snapshot = buildSnapshotFromGraph(graph, { + chatId: normalizedChatId, + revision: normalizeIndexedDbRevision(commitResult?.revision, requestedRevision), + baseSnapshot: baseSnapshot || undefined, + lastModified: Number(commitResult?.lastModified || Date.now()), + meta: { + storagePrimary: localStore.storagePrimary, + storageMode: localStore.storageMode, + lastMutationReason: String(reason || "graph-save"), + integrity: + currentIdentity.integrity || graphPersistenceState.metadataIntegrity, + hostChatId: currentIdentity.hostChatId || "", + }, + }); + snapshot.meta.revision = normalizeIndexedDbRevision( + commitResult?.revision, + requestedRevision, + ); + snapshot.meta.lastModified = Number(commitResult?.lastModified || Date.now()); + snapshot.meta.lastMutationReason = String(reason || "graph-save"); + snapshot.meta.storagePrimary = localStore.storagePrimary; + snapshot.meta.storageMode = localStore.storageMode; + cacheIndexedDbSnapshot(normalizedChatId, snapshot); + } if (graph === currentGraph) { stampGraphPersistenceMeta(currentGraph, { - revision: snapshot.meta.revision, + revision: normalizeIndexedDbRevision(commitResult?.revision, requestedRevision), reason: String(reason || "graph-save"), chatId: normalizedChatId, integrity: @@ -10596,18 +10801,23 @@ async function saveGraphToIndexedDb( requestedRevision, requestedNative: nativePersistRequested, requestedBridgeMode: - persistDeltaBuildDiagnostics?.requestedBridgeMode || nativePersistBridgeMode, + persistDeltaBuildDiagnostics?.requestedBridgeMode || + (directPersistDelta ? "direct-delta" : nativePersistBridgeMode), buildRequestedNative: Boolean(persistDeltaBuildDiagnostics?.requestedNative), nativeForceDisabled: nativePersistForceDisabled, nativeFailOpen: currentSettings.nativeEngineFailOpen !== false, gateAllowed: - persistDeltaBuildDiagnostics?.gateAllowed ?? nativePersistGate.allowed, + persistDeltaBuildDiagnostics?.gateAllowed ?? + (directPersistDelta ? true : nativePersistGate.allowed), gateReasons: cloneRuntimeDebugValue( persistDeltaBuildDiagnostics?.gateReasons, - nativePersistGate.reasons, + directPersistDelta ? ["direct-delta"] : nativePersistGate.reasons, + ), + preloadGateAllowed: directPersistDelta ? true : nativePersistGate.allowed, + preloadGateReasons: cloneRuntimeDebugValue( + directPersistDelta ? ["direct-delta"] : nativePersistGate.reasons, + [], ), - preloadGateAllowed: nativePersistGate.allowed, - preloadGateReasons: cloneRuntimeDebugValue(nativePersistGate.reasons, []), minSnapshotRecords: nativePersistGate.minSnapshotRecords, minStructuralDelta: nativePersistGate.minStructuralDelta, minCombinedSerializedChars: @@ -10634,7 +10844,10 @@ async function saveGraphToIndexedDb( nativePersistModuleStatus?.error || nativePersistPreloadError || "", ), status: "committed", - commitRevision: snapshot.meta.revision, + commitRevision: normalizeIndexedDbRevision( + commitResult?.revision, + requestedRevision, + ), commitDelta: cloneRuntimeDebugValue(commitResult?.delta, null), totalMs: readPersistDeltaDiagnosticsNow() - persistDeltaStartedAt, }; @@ -10653,6 +10866,18 @@ async function saveGraphToIndexedDb( typeof db?.getWriteLockSnapshot === "function" ? cloneRuntimeDebugValue(db.getWriteLockSnapshot(), null) : null; + const localStoreDiagnostics = + typeof readLocalStoreDiagnosticsSync === "function" + ? readLocalStoreDiagnosticsSync(db, localStore) + : { + resolvedLocalStore: `${localStore?.storagePrimary || "indexeddb"}:${localStore?.storageMode || "indexeddb"}`, + localStoreFormatVersion: + localStore.storagePrimary === "opfs" ? 2 : 1, + localStoreMigrationState: "idle", + opfsWalDepth: 0, + opfsPendingBytes: 0, + opfsCompactionState: null, + }; if (persistRole === "cache-mirror") { updateGraphPersistenceState({ @@ -10662,16 +10887,28 @@ async function saveGraphToIndexedDb( cacheMirrorState: "saved", storagePrimary: localStore.storagePrimary, storageMode: localStore.storageMode, - indexedDbRevision: snapshot.meta.revision, + resolvedLocalStore: localStoreDiagnostics.resolvedLocalStore, + localStoreFormatVersion: localStoreDiagnostics.localStoreFormatVersion, + localStoreMigrationState: localStoreDiagnostics.localStoreMigrationState, + indexedDbRevision: normalizeIndexedDbRevision( + commitResult?.revision, + requestedRevision, + ), indexedDbLastError: "", lastSyncError: scheduleUploadWarning, opfsWriteLockState, + opfsWalDepth: localStoreDiagnostics.opfsWalDepth, + opfsPendingBytes: localStoreDiagnostics.opfsPendingBytes, + opfsCompactionState: localStoreDiagnostics.opfsCompactionState, dualWriteLastResult: { action: "cache-mirror", target: localStore.storagePrimary, success: true, chatId: normalizedChatId, - revision: snapshot.meta.revision, + revision: normalizeIndexedDbRevision( + commitResult?.revision, + requestedRevision, + ), reason: String(reason || "graph-save"), warning: scheduleUploadWarning || "", delta: cloneRuntimeDebugValue(commitResult?.delta, null), @@ -10684,7 +10921,10 @@ async function saveGraphToIndexedDb( accepted: false, mirrored: true, chatId: normalizedChatId, - revision: snapshot.meta.revision, + revision: normalizeIndexedDbRevision( + commitResult?.revision, + requestedRevision, + ), reason: String(reason || "graph-save"), saveMode: `${localStore.reasonPrefix}-cache-mirror`, storageTier: localStoreTier, @@ -10700,18 +10940,30 @@ async function saveGraphToIndexedDb( cacheStorageTier: persistenceEnvironment.cacheStorageTier, cacheMirrorState: persistenceEnvironment.hostProfile === "luker" ? "idle" : "none", - revision: snapshot.meta.revision, + revision: normalizeIndexedDbRevision( + commitResult?.revision, + requestedRevision, + ), storagePrimary: localStore.storagePrimary, storageMode: localStore.storageMode, + resolvedLocalStore: localStoreDiagnostics.resolvedLocalStore, + localStoreFormatVersion: localStoreDiagnostics.localStoreFormatVersion, + localStoreMigrationState: localStoreDiagnostics.localStoreMigrationState, dbReady: true, - lastPersistedRevision: snapshot.meta.revision, + lastPersistedRevision: normalizeIndexedDbRevision( + commitResult?.revision, + requestedRevision, + ), pendingPersist: false, queuedPersistRevision: 0, queuedPersistChatId: "", queuedPersistMode: "", queuedPersistRotateIntegrity: false, queuedPersistReason: "", - indexedDbRevision: snapshot.meta.revision, + indexedDbRevision: normalizeIndexedDbRevision( + commitResult?.revision, + requestedRevision, + ), metadataIntegrity: getChatMetadataIntegrity(context) || currentIdentity.integrity || @@ -10721,22 +10973,30 @@ async function saveGraphToIndexedDb( syncDirty: true, syncDirtyReason: String(reason || "graph-save"), lastPersistReason: String(reason || "graph-save"), - lastPersistMode: `${localStore.reasonPrefix}-delta`, + lastPersistMode: directPersistDelta + ? `${localStore.reasonPrefix}-direct-delta` + : `${localStore.reasonPrefix}-delta`, lastAcceptedRevision: Math.max( Number(graphPersistenceState.lastAcceptedRevision || 0), - snapshot.meta.revision, + normalizeIndexedDbRevision(commitResult?.revision, requestedRevision), ), acceptedStorageTier: localStoreTier, acceptedBy: localStoreTier, lastRecoverableStorageTier: "none", persistDiagnosticTier: "none", opfsWriteLockState, + opfsWalDepth: localStoreDiagnostics.opfsWalDepth, + opfsPendingBytes: localStoreDiagnostics.opfsPendingBytes, + opfsCompactionState: localStoreDiagnostics.opfsCompactionState, dualWriteLastResult: { action: "save", target: localStore.storagePrimary, success: true, chatId: normalizedChatId, - revision: snapshot.meta.revision, + revision: normalizeIndexedDbRevision( + commitResult?.revision, + requestedRevision, + ), reason: String(reason || "graph-save"), warning: scheduleUploadWarning || "", delta: cloneRuntimeDebugValue(commitResult?.delta, null), @@ -10755,15 +11015,21 @@ async function saveGraphToIndexedDb( applyGraphLoadState(GRAPH_LOAD_STATES.LOADED, { chatId: normalizedChatId, reason: `shadow-promoted:${String(reason || "graph-save")}`, - revision: snapshot.meta.revision, - lastPersistedRevision: snapshot.meta.revision, + revision: normalizeIndexedDbRevision( + commitResult?.revision, + requestedRevision, + ), + lastPersistedRevision: normalizeIndexedDbRevision( + commitResult?.revision, + requestedRevision, + ), queuedPersistRevision: 0, queuedPersistChatId: "", pendingPersist: false, shadowSnapshotUsed: true, shadowSnapshotRevision: Math.max( Number(graphPersistenceState.shadowSnapshotRevision || 0), - snapshot.meta.revision, + normalizeIndexedDbRevision(commitResult?.revision, requestedRevision), ), shadowSnapshotUpdatedAt: String( graphPersistenceState.shadowSnapshotUpdatedAt || "", @@ -10782,9 +11048,14 @@ async function saveGraphToIndexedDb( saved: true, accepted: true, chatId: normalizedChatId, - revision: snapshot.meta.revision, + revision: normalizeIndexedDbRevision( + commitResult?.revision, + requestedRevision, + ), reason: String(reason || "graph-save"), - saveMode: `${localStore.reasonPrefix}-delta`, + saveMode: directPersistDelta + ? `${localStore.reasonPrefix}-direct-delta` + : `${localStore.reasonPrefix}-delta`, storageTier: localStoreTier, warning: scheduleUploadWarning || "", delta: cloneRuntimeDebugValue(commitResult?.delta, null), @@ -10849,10 +11120,11 @@ function queueGraphPersistToIndexedDb( reason = "graph-save", persistRole = "primary", scheduleCloudUpload = undefined, + persistDelta = null, } = {}, ) { const normalizedChatId = normalizeChatIdCandidate(chatId); - if (!normalizedChatId || !graph) return; + if (!normalizedChatId || (!graph && !persistDelta)) return; if (persistRole === "cache-mirror") { const persistenceEnvironment = buildPersistenceEnvironment( @@ -10896,12 +11168,15 @@ function queueGraphPersistToIndexedDb( revision: normalizedRevision, }; } - const graphSnapshot = cloneGraphForPersistence(graph, normalizedChatId); + const graphSnapshot = graph + ? cloneGraphForPersistence(graph, normalizedChatId) + : null; return await saveGraphToIndexedDb(normalizedChatId, graphSnapshot, { revision: normalizedRevision, reason, persistRole, scheduleCloudUpload, + persistDelta, }); }) .finally(() => { @@ -13012,6 +13287,7 @@ async function executeExtractionBatch({ { appendBatchJournal, applyProcessedHistorySnapshotToGraph, + buildPersistDelta, buildExtractionMessages, cloneGraphSnapshot, computePostProcessArtifacts, diff --git a/maintenance/extraction-controller.js b/maintenance/extraction-controller.js index b769749..3443de3 100644 --- a/maintenance/extraction-controller.js +++ b/maintenance/extraction-controller.js @@ -332,6 +332,12 @@ function buildCommittedBatchPersistSnapshot( } return { + persistDelta: + typeof runtime.buildPersistDelta === "function" + ? runtime.buildPersistDelta(beforeSnapshot, committedGraphSnapshot, { + useNativeDelta: false, + }) + : null, persistGraphSnapshot: committedGraphSnapshot, committedBatchJournalEntry, afterSnapshot, @@ -633,6 +639,7 @@ export async function executeExtractionBatchController( reason: "extraction-batch-complete", lastProcessedAssistantFloor: endIdx, graphSnapshot: committedPersistState.persistGraphSnapshot, + persistDelta: committedPersistState.persistDelta, }); const persistence = normalizePersistenceStateRecord(persistResult); batchStatusRef.persistence = persistence; diff --git a/runtime/settings-defaults.js b/runtime/settings-defaults.js index 2d23811..b9b54ca 100644 --- a/runtime/settings-defaults.js +++ b/runtime/settings-defaults.js @@ -168,7 +168,7 @@ export const defaultSettings = { // UI 面板 noticeDisplayMode: "normal", panelTheme: "crimson", - graphLocalStorageMode: "indexeddb", + graphLocalStorageMode: "auto", cloudStorageMode: "automatic", }; diff --git a/sync/bme-db.js b/sync/bme-db.js index f050277..e8c8d20 100644 --- a/sync/bme-db.js +++ b/sync/bme-db.js @@ -1542,6 +1542,30 @@ function buildPersistDeltaFromIdShape(preparedContext, delta = null) { }; } +function buildPersistCountDelta(beforeSnapshot = {}, afterSnapshot = {}) { + const normalizedBefore = normalizePersistSnapshotView(beforeSnapshot); + const normalizedAfter = normalizePersistSnapshotView(afterSnapshot); + const previous = { + nodes: toArray(normalizedBefore.nodes).length, + edges: toArray(normalizedBefore.edges).length, + tombstones: toArray(normalizedBefore.tombstones).length, + }; + const next = { + nodes: toArray(normalizedAfter.nodes).length, + edges: toArray(normalizedAfter.edges).length, + tombstones: toArray(normalizedAfter.tombstones).length, + }; + return { + previous, + next, + delta: { + nodes: next.nodes - previous.nodes, + edges: next.edges - previous.edges, + tombstones: next.tombstones - previous.tombstones, + }, + }; +} + function readPersistDeltaNow() { if (typeof performance === "object" && typeof performance.now === "function") { return performance.now(); @@ -1710,6 +1734,7 @@ export function buildPersistDelta(beforeSnapshot, afterSnapshot, options = {}) { if (nativeDelta) { const result = { ...nativeDelta, + countDelta: buildPersistCountDelta(normalizedBefore, normalizedAfter), runtimeMetaPatch: { ...buildRuntimeMetaPatch(normalizedAfter), ...nativeDelta.runtimeMetaPatch, @@ -1876,6 +1901,7 @@ export function buildPersistDelta(beforeSnapshot, afterSnapshot, options = {}) { deleteNodeIds, deleteEdgeIds, tombstones: Array.from(tombstoneMap.values()), + countDelta: buildPersistCountDelta(normalizedBefore, normalizedAfter), runtimeMetaPatch: { ...buildRuntimeMetaPatch(normalizedAfter), ...(options.runtimeMetaPatch && @@ -2457,6 +2483,12 @@ export class BmeDatabase { const reason = String(options.reason || "commitDelta"); const requestedRevision = normalizeRevision(options.requestedRevision); const shouldMarkSyncDirty = options.markSyncDirty !== false; + const normalizedCountDelta = + normalizedDelta.countDelta && + typeof normalizedDelta.countDelta === "object" && + !Array.isArray(normalizedDelta.countDelta) + ? normalizedDelta.countDelta + : {}; let nextRevision = 0; let counts = { @@ -2494,7 +2526,7 @@ export class BmeDatabase { await this._setMetaInTx(db, key, value, nowMs); } - counts = await this._updateCountMetaInTx(db, nowMs); + counts = await this._applyCountDeltaMetaInTx(db, normalizedCountDelta, nowMs); const currentRevision = normalizeRevision( (await db.table("meta").get("revision"))?.value, ); @@ -3276,6 +3308,69 @@ export class BmeDatabase { }; } + async _applyCountDeltaMetaInTx( + db, + countDelta = null, + nowMs = Date.now(), + ) { + const nextCounts = + countDelta?.next && + typeof countDelta.next === "object" && + !Array.isArray(countDelta.next) + ? countDelta.next + : null; + if (nextCounts) { + const nodes = normalizeNonNegativeInteger(nextCounts.nodes, 0); + const edges = normalizeNonNegativeInteger(nextCounts.edges, 0); + const tombstones = normalizeNonNegativeInteger(nextCounts.tombstones, 0); + await this._setMetaInTx(db, "nodeCount", nodes, nowMs); + await this._setMetaInTx(db, "edgeCount", edges, nowMs); + await this._setMetaInTx(db, "tombstoneCount", tombstones, nowMs); + return { + nodes, + edges, + tombstones, + }; + } + + const previousCounts = + countDelta?.previous && + typeof countDelta.previous === "object" && + !Array.isArray(countDelta.previous) + ? countDelta.previous + : null; + const deltaCounts = + countDelta?.delta && + typeof countDelta.delta === "object" && + !Array.isArray(countDelta.delta) + ? countDelta.delta + : null; + if (previousCounts && deltaCounts) { + const nodes = normalizeNonNegativeInteger( + Number(previousCounts.nodes || 0) + Number(deltaCounts.nodes || 0), + 0, + ); + const edges = normalizeNonNegativeInteger( + Number(previousCounts.edges || 0) + Number(deltaCounts.edges || 0), + 0, + ); + const tombstones = normalizeNonNegativeInteger( + Number(previousCounts.tombstones || 0) + Number(deltaCounts.tombstones || 0), + 0, + ); + await this._setMetaInTx(db, "nodeCount", nodes, nowMs); + await this._setMetaInTx(db, "edgeCount", edges, nowMs); + await this._setMetaInTx(db, "tombstoneCount", tombstones, nowMs); + return { + nodes, + edges, + tombstones, + }; + } + + return await this._updateCountMetaInTx(db, nowMs); + } + _applyListOptions(records, options = {}) { let nextRecords = toArray(records); diff --git a/sync/bme-opfs-store.js b/sync/bme-opfs-store.js index 78f969f..de50d04 100644 --- a/sync/bme-opfs-store.js +++ b/sync/bme-opfs-store.js @@ -16,8 +16,17 @@ const OPFS_CHATS_DIRECTORY_NAME = "chats"; const OPFS_MANIFEST_FILENAME = "manifest.json"; const OPFS_MANIFEST_VERSION = 1; const OPFS_STORE_KIND = "opfs"; +const OPFS_FORMAT_VERSION_V2 = 2; const OPFS_CORE_FILENAME_PREFIX = "core.snapshot"; const OPFS_AUX_FILENAME_PREFIX = "aux.snapshot"; +const OPFS_V2_META_DIRECTORY = "meta"; +const OPFS_V2_SHARDS_DIRECTORY = "shards"; +const OPFS_V2_WAL_DIRECTORY = "wal"; +const OPFS_V2_NODE_BUCKET_COUNT = 64; +const OPFS_V2_EDGE_BUCKET_COUNT = 128; +const OPFS_V2_TOMBSTONE_BUCKET_COUNT = 16; +const OPFS_V2_WAL_COMPACTION_THRESHOLD = 64; +const OPFS_V2_WAL_BYTES_THRESHOLD = 16 * 1024 * 1024; const OPFS_MANIFEST_META_KEYS = new Set([ "chatId", "revision", @@ -58,6 +67,7 @@ const OPFS_AUX_META_KEYS = new Set([ ]); export const BME_GRAPH_LOCAL_STORAGE_MODE_INDEXEDDB = "indexeddb"; +export const BME_GRAPH_LOCAL_STORAGE_MODE_AUTO = "auto"; export const BME_GRAPH_LOCAL_STORAGE_MODE_OPFS_SHADOW = "opfs-shadow"; export const BME_GRAPH_LOCAL_STORAGE_MODE_OPFS_PRIMARY = "opfs-primary"; @@ -94,6 +104,14 @@ function normalizeSourceFloor(value) { return Math.floor(parsed); } +function normalizeNonNegativeInteger(value, fallback = 0) { + const parsed = Number(value); + if (!Number.isFinite(parsed) || parsed < 0) { + return Math.max(0, Math.floor(Number(fallback) || 0)); + } + return Math.floor(parsed); +} + function deriveNodeSourceFloor(node = {}) { const directSourceFloor = normalizeSourceFloor(node?.sourceFloor); if (directSourceFloor != null) return directSourceFloor; @@ -231,6 +249,9 @@ function normalizeGraphLocalStorageModeInternal( fallbackValue = BME_GRAPH_LOCAL_STORAGE_MODE_INDEXEDDB, ) { const normalized = String(value || "").trim().toLowerCase(); + if (normalized === BME_GRAPH_LOCAL_STORAGE_MODE_AUTO) { + return BME_GRAPH_LOCAL_STORAGE_MODE_AUTO; + } if (normalized === BME_GRAPH_LOCAL_STORAGE_MODE_INDEXEDDB) { return BME_GRAPH_LOCAL_STORAGE_MODE_INDEXEDDB; } @@ -242,6 +263,9 @@ function normalizeGraphLocalStorageModeInternal( function normalizeGraphLocalStorageModeInternalFallback(value) { const normalized = String(value || "").trim().toLowerCase(); + if (normalized === BME_GRAPH_LOCAL_STORAGE_MODE_AUTO) { + return BME_GRAPH_LOCAL_STORAGE_MODE_AUTO; + } if (OPFS_ENABLED_MODES.has(normalized)) { return normalized; } @@ -575,7 +599,7 @@ export async function detectOpfsSupport(options = {}) { } } -export class OpfsGraphStore { +class LegacyOpfsGraphStore { constructor(chatId, options = {}) { this.chatId = normalizeChatId(chatId); this.options = options; @@ -1615,3 +1639,1587 @@ export class OpfsGraphStore { return buildSnapshotFromStoredParts(manifest, corePayload, auxPayload); } } + +function hashRecordIdToBucket(id = "", bucketCount = 1) { + const normalizedId = normalizeRecordId(id); + const normalizedBucketCount = Math.max(1, Math.floor(Number(bucketCount) || 1)); + let hash = 2166136261; + for (let index = 0; index < normalizedId.length; index += 1) { + hash ^= normalizedId.charCodeAt(index); + hash = Math.imul(hash, 16777619); + } + return Math.abs(hash >>> 0) % normalizedBucketCount; +} + +function buildOpfsV2ShardFilename(kind = "nodes", bucketIndex = 0) { + return `${String(kind || "records")}.${Math.max(0, Math.floor(Number(bucketIndex) || 0)) + .toString(16) + .padStart(2, "0")}.json`; +} + +function buildOpfsV2WalFilename(revision = 0) { + return `commit.${normalizeRevision(revision)}.json`; +} + +function buildOpfsV2MetaFilename(key = "") { + return `meta.${encodeURIComponent(normalizeRecordId(key))}.json`; +} + +function parseOpfsV2WalFilename(name = "") { + const match = String(name || "").trim().match(/^commit\.(\d+)\.json$/); + if (!match) return null; + return { + filename: String(name || "").trim(), + revision: normalizeRevision(match[1]), + }; +} + +function parseOpfsV2MetaFilename(name = "") { + const match = String(name || "").trim().match(/^meta\.(.+)\.json$/); + if (!match) return null; + return normalizeRecordId(decodeURIComponent(match[1] || "")); +} + +function normalizeOpfsV2CountDelta(countDelta = null) { + const next = + countDelta?.next && typeof countDelta.next === "object" && !Array.isArray(countDelta.next) + ? countDelta.next + : null; + const previous = + countDelta?.previous && + typeof countDelta.previous === "object" && + !Array.isArray(countDelta.previous) + ? countDelta.previous + : null; + const delta = + countDelta?.delta && typeof countDelta.delta === "object" && !Array.isArray(countDelta.delta) + ? countDelta.delta + : null; + return { + previous: { + nodes: normalizeNonNegativeInteger(previous?.nodes, 0), + edges: normalizeNonNegativeInteger(previous?.edges, 0), + tombstones: normalizeNonNegativeInteger(previous?.tombstones, 0), + }, + next: { + nodes: normalizeNonNegativeInteger(next?.nodes, 0), + edges: normalizeNonNegativeInteger(next?.edges, 0), + tombstones: normalizeNonNegativeInteger(next?.tombstones, 0), + }, + delta: { + nodes: Number.isFinite(Number(delta?.nodes)) + ? Math.trunc(Number(delta.nodes)) + : normalizeNonNegativeInteger(next?.nodes, 0) - + normalizeNonNegativeInteger(previous?.nodes, 0), + edges: Number.isFinite(Number(delta?.edges)) + ? Math.trunc(Number(delta.edges)) + : normalizeNonNegativeInteger(next?.edges, 0) - + normalizeNonNegativeInteger(previous?.edges, 0), + tombstones: Number.isFinite(Number(delta?.tombstones)) + ? Math.trunc(Number(delta.tombstones)) + : normalizeNonNegativeInteger(next?.tombstones, 0) - + normalizeNonNegativeInteger(previous?.tombstones, 0), + }, + }; +} + +function sanitizeOpfsV2Delta(delta = {}, nowMs = Date.now()) { + const normalizedDelta = + delta && typeof delta === "object" && !Array.isArray(delta) ? delta : {}; + return { + upsertNodes: sanitizeSnapshotRecordArray(normalizedDelta.upsertNodes).map((node) => ({ + ...node, + id: normalizeRecordId(node.id), + updatedAt: normalizeTimestamp(node.updatedAt, nowMs), + })), + upsertEdges: sanitizeSnapshotRecordArray(normalizedDelta.upsertEdges).map((edge) => ({ + ...edge, + id: normalizeRecordId(edge.id), + fromId: normalizeRecordId(edge.fromId), + toId: normalizeRecordId(edge.toId), + updatedAt: normalizeTimestamp(edge.updatedAt, nowMs), + })), + deleteNodeIds: toArray(normalizedDelta.deleteNodeIds) + .map((value) => normalizeRecordId(value)) + .filter(Boolean), + deleteEdgeIds: toArray(normalizedDelta.deleteEdgeIds) + .map((value) => normalizeRecordId(value)) + .filter(Boolean), + tombstones: sanitizeSnapshotRecordArray(normalizedDelta.tombstones).map((tombstone) => ({ + ...tombstone, + id: normalizeRecordId(tombstone.id), + kind: normalizeRecordId(tombstone.kind), + targetId: normalizeRecordId(tombstone.targetId), + sourceDeviceId: normalizeRecordId(tombstone.sourceDeviceId), + deletedAt: normalizeTimestamp(tombstone.deletedAt, nowMs), + })), + runtimeMetaPatch: + normalizedDelta.runtimeMetaPatch && + typeof normalizedDelta.runtimeMetaPatch === "object" && + !Array.isArray(normalizedDelta.runtimeMetaPatch) + ? toPlainData(normalizedDelta.runtimeMetaPatch, {}) + : {}, + countDelta: normalizeOpfsV2CountDelta(normalizedDelta.countDelta), + }; +} + +function applyOpfsV2DeltaToSnapshot(snapshot = {}, delta = {}, nowMs = Date.now()) { + const nextSnapshot = sanitizeSnapshot(snapshot); + const normalizedDelta = sanitizeOpfsV2Delta(delta, nowMs); + const nodeMap = new Map( + sanitizeSnapshotRecordArray(nextSnapshot.nodes).map((record) => [ + normalizeRecordId(record.id), + record, + ]), + ); + const edgeMap = new Map( + sanitizeSnapshotRecordArray(nextSnapshot.edges).map((record) => [ + normalizeRecordId(record.id), + record, + ]), + ); + const tombstoneMap = new Map( + sanitizeSnapshotRecordArray(nextSnapshot.tombstones).map((record) => [ + normalizeRecordId(record.id), + record, + ]), + ); + + for (const nodeId of normalizedDelta.deleteNodeIds) { + nodeMap.delete(nodeId); + } + for (const edgeId of normalizedDelta.deleteEdgeIds) { + edgeMap.delete(edgeId); + } + for (const node of normalizedDelta.upsertNodes) { + if (!node.id) continue; + nodeMap.set(node.id, node); + } + for (const edge of normalizedDelta.upsertEdges) { + if (!edge.id) continue; + edgeMap.set(edge.id, edge); + } + for (const tombstone of normalizedDelta.tombstones) { + if (!tombstone.id) continue; + tombstoneMap.set(tombstone.id, tombstone); + } + + nextSnapshot.nodes = Array.from(nodeMap.values()); + nextSnapshot.edges = Array.from(edgeMap.values()); + nextSnapshot.tombstones = Array.from(tombstoneMap.values()); + nextSnapshot.meta = { + ...(nextSnapshot.meta || {}), + ...(normalizedDelta.runtimeMetaPatch || {}), + nodeCount: nextSnapshot.nodes.length, + edgeCount: nextSnapshot.edges.length, + tombstoneCount: nextSnapshot.tombstones.length, + }; + nextSnapshot.state = normalizeSnapshotState(nextSnapshot); + if (Object.prototype.hasOwnProperty.call(normalizedDelta.runtimeMetaPatch, "lastProcessedFloor")) { + nextSnapshot.state.lastProcessedFloor = Number.isFinite( + Number(normalizedDelta.runtimeMetaPatch.lastProcessedFloor), + ) + ? Number(normalizedDelta.runtimeMetaPatch.lastProcessedFloor) + : META_DEFAULT_LAST_PROCESSED_FLOOR; + } + if (Object.prototype.hasOwnProperty.call(normalizedDelta.runtimeMetaPatch, "extractionCount")) { + nextSnapshot.state.extractionCount = Number.isFinite( + Number(normalizedDelta.runtimeMetaPatch.extractionCount), + ) + ? Number(normalizedDelta.runtimeMetaPatch.extractionCount) + : META_DEFAULT_EXTRACTION_COUNT; + } + nextSnapshot.meta.lastProcessedFloor = nextSnapshot.state.lastProcessedFloor; + nextSnapshot.meta.extractionCount = nextSnapshot.state.extractionCount; + return nextSnapshot; +} + +function splitOpfsV2SnapshotMeta(meta = {}) { + const manifestMeta = {}; + const runtimeMeta = {}; + for (const [rawKey, value] of Object.entries(meta || {})) { + const key = normalizeRecordId(rawKey); + if (!key) continue; + const clonedValue = toPlainData(value, value); + if (OPFS_MANIFEST_META_KEYS.has(key)) { + manifestMeta[key] = clonedValue; + continue; + } + runtimeMeta[key] = clonedValue; + } + return { + manifestMeta, + runtimeMeta, + }; +} + +function createEmptyOpfsV2Manifest(chatId = "", storeMode = BME_GRAPH_LOCAL_STORAGE_MODE_OPFS_PRIMARY) { + return { + version: OPFS_MANIFEST_VERSION, + formatVersion: OPFS_FORMAT_VERSION_V2, + chatId: normalizeChatId(chatId), + storeKind: OPFS_STORE_KIND, + storeMode: normalizeGraphLocalStorageMode( + storeMode, + BME_GRAPH_LOCAL_STORAGE_MODE_OPFS_PRIMARY, + ), + baseRevision: 0, + headRevision: 0, + lastCompactedRevision: 0, + pendingLogFromRevision: 1, + shardLayout: { + nodes: OPFS_V2_NODE_BUCKET_COUNT, + edges: OPFS_V2_EDGE_BUCKET_COUNT, + tombstones: OPFS_V2_TOMBSTONE_BUCKET_COUNT, + }, + wal: { + count: 0, + totalBytes: 0, + }, + compaction: { + state: "idle", + queued: false, + lastAt: 0, + lastReason: "", + }, + meta: createDefaultMetaValues(chatId), + }; +} + +function isOpfsV2Manifest(manifest = null) { + return Number(manifest?.formatVersion || 0) === OPFS_FORMAT_VERSION_V2; +} + +function groupOpfsV2RecordsByBucket(records = [], bucketCount = 1) { + const bucketMap = new Map(); + for (const record of sanitizeSnapshotRecordArray(records)) { + const id = normalizeRecordId(record.id); + if (!id) continue; + const bucketIndex = hashRecordIdToBucket(id, bucketCount); + const bucketRecords = bucketMap.get(bucketIndex) || []; + bucketRecords.push({ ...record, id }); + bucketMap.set(bucketIndex, bucketRecords); + } + return bucketMap; +} + +function buildOpfsV2IntegritySummary(snapshot = {}) { + return { + nodeCount: normalizeNonNegativeInteger(snapshot?.nodes?.length, 0), + edgeCount: normalizeNonNegativeInteger(snapshot?.edges?.length, 0), + tombstoneCount: normalizeNonNegativeInteger(snapshot?.tombstones?.length, 0), + revision: normalizeRevision(snapshot?.meta?.revision), + }; +} + +export class OpfsGraphStore { + constructor(chatId, options = {}) { + this.chatId = normalizeChatId(chatId); + this.options = options; + this.storeKind = OPFS_STORE_KIND; + const normalizedStoreMode = normalizeGraphLocalStorageMode( + options.storeMode, + BME_GRAPH_LOCAL_STORAGE_MODE_OPFS_PRIMARY, + ); + this.storeMode = + normalizedStoreMode === BME_GRAPH_LOCAL_STORAGE_MODE_OPFS_SHADOW + ? BME_GRAPH_LOCAL_STORAGE_MODE_OPFS_PRIMARY + : normalizedStoreMode; + this._rootDirectoryFactory = + typeof options.rootDirectoryFactory === "function" + ? options.rootDirectoryFactory + : getDefaultOpfsRootDirectory; + this._chatDirectoryPromise = null; + this._manifestCache = null; + this._snapshotCache = null; + this._writeChain = Promise.resolve(); + this._writeQueueDepth = 0; + this._writeLockState = { + active: false, + queueDepth: 0, + lastReason: "", + updatedAt: 0, + }; + this._compactionScheduled = false; + } + + async open() { + await this._ensureV2Ready(); + return this; + } + + async close() { + this._chatDirectoryPromise = null; + this._manifestCache = null; + this._snapshotCache = null; + this._writeChain = Promise.resolve(); + this._writeQueueDepth = 0; + this._compactionScheduled = false; + this._writeLockState = { + active: false, + queueDepth: 0, + lastReason: "", + updatedAt: 0, + }; + } + + getWriteLockSnapshot() { + return toPlainData(this._writeLockState, this._writeLockState); + } + + async _awaitPendingWrites() { + try { + await this._writeChain; + } catch { + // ignore previous write failure for read barrier + } + } + + _setWriteLockState(patch = {}) { + this._writeLockState = { + ...this._writeLockState, + ...(patch || {}), + updatedAt: Date.now(), + }; + return this._writeLockState; + } + + async _runSerializedWrite(reason = "opfs-v2-write", task = null) { + if (typeof task !== "function") { + throw new Error("OpfsGraphStore serialized write task is required"); + } + this._writeQueueDepth += 1; + this._setWriteLockState({ + active: true, + queueDepth: this._writeQueueDepth, + lastReason: String(reason || "opfs-v2-write"), + }); + const runTask = async () => { + try { + return await task(); + } finally { + this._writeQueueDepth = Math.max(0, this._writeQueueDepth - 1); + this._setWriteLockState({ + active: this._writeQueueDepth > 0, + queueDepth: this._writeQueueDepth, + lastReason: String(reason || "opfs-v2-write"), + }); + } + }; + const nextWrite = this._writeChain.catch(() => null).then(runTask); + this._writeChain = nextWrite.catch(() => null); + return await nextWrite; + } + + async getMeta(key, fallbackValue = null) { + const normalizedKey = normalizeRecordId(key); + if (!normalizedKey) return fallbackValue; + const manifest = await this._ensureV2Ready(); + if (OPFS_MANIFEST_META_KEYS.has(normalizedKey)) { + const manifestMeta = + manifest?.meta && typeof manifest.meta === "object" && !Array.isArray(manifest.meta) + ? manifest.meta + : {}; + return Object.prototype.hasOwnProperty.call(manifestMeta, normalizedKey) + ? manifestMeta[normalizedKey] + : fallbackValue; + } + const snapshot = await this._loadSnapshot(); + return Object.prototype.hasOwnProperty.call(snapshot.meta, normalizedKey) + ? snapshot.meta[normalizedKey] + : fallbackValue; + } + + async setMeta(key, value) { + const normalizedKey = normalizeRecordId(key); + if (!normalizedKey) return null; + await this.patchMeta({ + [normalizedKey]: value, + }); + return { + key: normalizedKey, + value: await this.getMeta(normalizedKey, null), + updatedAt: Date.now(), + }; + } + + async patchMeta(record) { + if (!record || typeof record !== "object" || Array.isArray(record)) { + return {}; + } + const entries = Object.entries(record) + .map(([rawKey, value]) => [normalizeRecordId(rawKey), toPlainData(value, value)]) + .filter(([key]) => Boolean(key)); + if (!entries.length) { + return {}; + } + return await this._runSerializedWrite("patchMeta", async () => { + const manifest = await this._ensureV2Ready({ awaitWrites: false }); + const manifestPatch = {}; + const runtimePatch = {}; + for (const [key, value] of entries) { + if (OPFS_MANIFEST_META_KEYS.has(key)) { + manifestPatch[key] = value; + } else { + runtimePatch[key] = value; + } + } + + if (Object.keys(runtimePatch).length > 0) { + await this._writeRuntimeMetaEntries(runtimePatch); + } + + if (Object.keys(manifestPatch).length > 0) { + const nextManifest = { + ...manifest, + meta: { + ...createDefaultMetaValues(this.chatId), + ...(manifest.meta || {}), + ...manifestPatch, + chatId: this.chatId, + schemaVersion: BME_DB_SCHEMA_VERSION, + storagePrimary: OPFS_STORE_KIND, + storageMode: this.storeMode, + }, + }; + await this._writeManifest(nextManifest); + } + + if (this._snapshotCache) { + this._snapshotCache.meta = { + ...this._snapshotCache.meta, + ...manifestPatch, + ...runtimePatch, + }; + this._snapshotCache.state = { + ...normalizeSnapshotState(this._snapshotCache), + ...(Object.prototype.hasOwnProperty.call(manifestPatch, "lastProcessedFloor") + ? { + lastProcessedFloor: Number.isFinite( + Number(manifestPatch.lastProcessedFloor), + ) + ? Number(manifestPatch.lastProcessedFloor) + : META_DEFAULT_LAST_PROCESSED_FLOOR, + } + : {}), + ...(Object.prototype.hasOwnProperty.call(manifestPatch, "extractionCount") + ? { + extractionCount: Number.isFinite(Number(manifestPatch.extractionCount)) + ? Number(manifestPatch.extractionCount) + : META_DEFAULT_EXTRACTION_COUNT, + } + : {}), + }; + this._snapshotCache.meta.lastProcessedFloor = + this._snapshotCache.state.lastProcessedFloor; + this._snapshotCache.meta.extractionCount = + this._snapshotCache.state.extractionCount; + } + + return Object.fromEntries(entries); + }); + } + + async getRevision() { + return normalizeRevision(await this.getMeta("revision", 0)); + } + + async markSyncDirty(reason = "mutation") { + await this.patchMeta({ + syncDirty: true, + syncDirtyReason: String(reason || "mutation"), + }); + return true; + } + + getStorageDiagnosticsSync() { + const manifest = this._manifestCache || null; + return { + formatVersion: isOpfsV2Manifest(manifest) ? OPFS_FORMAT_VERSION_V2 : 1, + walCount: normalizeNonNegativeInteger(manifest?.wal?.count, 0), + walTotalBytes: normalizeNonNegativeInteger(manifest?.wal?.totalBytes, 0), + baseRevision: normalizeRevision(manifest?.baseRevision || 0), + headRevision: normalizeRevision( + manifest?.headRevision || manifest?.meta?.revision || 0, + ), + lastCompactedRevision: normalizeRevision( + manifest?.lastCompactedRevision || 0, + ), + pendingLogFromRevision: normalizeRevision( + manifest?.pendingLogFromRevision || 0, + ), + compactionState: toPlainData(manifest?.compaction || {}, {}), + resolvedStoreMode: this.storeMode, + }; + } + + async commitDelta(delta = {}, options = {}) { + return await this._runSerializedWrite( + String(options?.reason || "commitDelta"), + async () => { + const manifest = await this._ensureV2Ready({ awaitWrites: false }); + const nowMs = Date.now(); + const normalizedDelta = sanitizeOpfsV2Delta(delta, nowMs); + const requestedRevision = normalizeRevision(options.requestedRevision); + const shouldMarkSyncDirty = options.markSyncDirty !== false; + const reason = String(options.reason || "commitDelta"); + const currentHeadRevision = normalizeRevision( + manifest?.headRevision || manifest?.meta?.revision, + ); + const nextRevision = Math.max(currentHeadRevision + 1, requestedRevision); + const nextCountDelta = normalizeOpfsV2CountDelta(normalizedDelta.countDelta); + const nextMeta = { + ...createDefaultMetaValues(this.chatId), + ...(manifest?.meta || {}), + ...Object.fromEntries( + Object.entries(normalizedDelta.runtimeMetaPatch).filter(([key]) => + OPFS_MANIFEST_META_KEYS.has(normalizeRecordId(key)), + ), + ), + chatId: this.chatId, + revision: nextRevision, + lastModified: nowMs, + lastMutationReason: reason, + syncDirty: shouldMarkSyncDirty, + syncDirtyReason: shouldMarkSyncDirty ? reason : "", + storagePrimary: OPFS_STORE_KIND, + storageMode: this.storeMode, + nodeCount: normalizeNonNegativeInteger(nextCountDelta.next.nodes, 0), + edgeCount: normalizeNonNegativeInteger(nextCountDelta.next.edges, 0), + tombstoneCount: normalizeNonNegativeInteger(nextCountDelta.next.tombstones, 0), + }; + const walRecord = { + version: OPFS_MANIFEST_VERSION, + formatVersion: OPFS_FORMAT_VERSION_V2, + revision: nextRevision, + reason, + committedAt: nowMs, + delta: normalizedDelta, + runtimeMetaPatch: normalizedDelta.runtimeMetaPatch, + countDelta: nextCountDelta, + }; + const walDirectory = await this._getWalDirectory(); + const walFilename = buildOpfsV2WalFilename(nextRevision); + await writeJsonFile(walDirectory, walFilename, walRecord); + const walByteLength = JSON.stringify(walRecord).length; + + const hadPendingWal = + normalizeRevision(manifest?.pendingLogFromRevision) <= currentHeadRevision; + const nextManifest = { + ...manifest, + formatVersion: OPFS_FORMAT_VERSION_V2, + chatId: this.chatId, + storeKind: OPFS_STORE_KIND, + storeMode: this.storeMode, + headRevision: nextRevision, + pendingLogFromRevision: hadPendingWal + ? normalizeRevision(manifest?.pendingLogFromRevision || nextRevision) + : nextRevision, + wal: { + count: normalizeNonNegativeInteger(manifest?.wal?.count, 0) + 1, + totalBytes: + normalizeNonNegativeInteger(manifest?.wal?.totalBytes, 0) + walByteLength, + }, + meta: nextMeta, + compaction: { + ...(manifest?.compaction || {}), + state: "pending", + queued: false, + lastReason: reason, + }, + }; + await this._writeManifest(nextManifest); + + if (this._snapshotCache) { + const nextSnapshot = applyOpfsV2DeltaToSnapshot( + this._snapshotCache, + normalizedDelta, + nowMs, + ); + nextSnapshot.meta = { + ...nextSnapshot.meta, + ...nextMeta, + }; + nextSnapshot.state = normalizeSnapshotState(nextSnapshot); + this._snapshotCache = nextSnapshot; + } + + this._maybeScheduleCompaction(nextManifest, reason); + + return { + revision: nextRevision, + lastModified: nowMs, + imported: { + nodes: nextMeta.nodeCount, + edges: nextMeta.edgeCount, + tombstones: nextMeta.tombstoneCount, + }, + delta: { + upsertNodes: normalizedDelta.upsertNodes.length, + upsertEdges: normalizedDelta.upsertEdges.length, + deleteNodeIds: normalizedDelta.deleteNodeIds.length, + deleteEdgeIds: normalizedDelta.deleteEdgeIds.length, + tombstones: normalizedDelta.tombstones.length, + }, + }; + }, + ); + } + + async bulkUpsertNodes(nodes = []) { + const records = sanitizeSnapshotRecordArray(nodes); + if (!records.length) { + return { + upserted: 0, + revision: await this.getRevision(), + }; + } + const result = await this.commitDelta( + { + upsertNodes: records, + }, + { + reason: "bulkUpsertNodes", + }, + ); + return { + upserted: records.length, + revision: result.revision, + }; + } + + async bulkUpsertEdges(edges = []) { + const records = sanitizeSnapshotRecordArray(edges); + if (!records.length) { + return { + upserted: 0, + revision: await this.getRevision(), + }; + } + const result = await this.commitDelta( + { + upsertEdges: records, + }, + { + reason: "bulkUpsertEdges", + }, + ); + return { + upserted: records.length, + revision: result.revision, + }; + } + + async bulkUpsertTombstones(tombstones = []) { + const records = sanitizeSnapshotRecordArray(tombstones); + if (!records.length) { + return { + upserted: 0, + revision: await this.getRevision(), + }; + } + const result = await this.commitDelta( + { + tombstones: records, + }, + { + reason: "bulkUpsertTombstones", + }, + ); + return { + upserted: records.length, + revision: result.revision, + }; + } + + async listNodes(options = {}) { + const snapshot = await this._loadSnapshot(); + let records = sanitizeSnapshotRecordArray(snapshot.nodes); + const includeDeleted = options.includeDeleted !== false; + const includeArchived = options.includeArchived !== false; + if (!includeDeleted) { + records = records.filter((node) => !Number.isFinite(Number(node?.deletedAt))); + } + if (!includeArchived) { + records = records.filter((node) => node?.archived !== true); + } + return applyListOptions(records, options); + } + + async listEdges(options = {}) { + const snapshot = await this._loadSnapshot(); + let records = sanitizeSnapshotRecordArray(snapshot.edges); + const includeDeleted = options.includeDeleted !== false; + if (!includeDeleted) { + records = records.filter((edge) => !Number.isFinite(Number(edge?.deletedAt))); + } + return applyListOptions(records, options); + } + + async listTombstones(options = {}) { + const snapshot = await this._loadSnapshot(); + return applyListOptions(snapshot.tombstones, options); + } + + async isEmpty(options = {}) { + const snapshot = await this._loadSnapshot(); + const includeTombstones = options.includeTombstones === true; + const nodes = snapshot.nodes.length; + const edges = snapshot.edges.length; + const tombstones = snapshot.tombstones.length; + return { + empty: includeTombstones + ? nodes === 0 && edges === 0 && tombstones === 0 + : nodes === 0 && edges === 0, + nodes, + edges, + tombstones, + includeTombstones, + }; + } + + async importLegacyGraph(legacyGraph, options = {}) { + const nowMs = normalizeTimestamp(options.nowMs, Date.now()); + const migrationSource = + normalizeRecordId(options.source || "chat_metadata") || "chat_metadata"; + const requestedRetentionMs = Number(options.legacyRetentionMs); + const legacyRetentionMs = + Number.isFinite(requestedRetentionMs) && requestedRetentionMs >= 0 + ? Math.floor(requestedRetentionMs) + : BME_LEGACY_RETENTION_MS; + const legacyRetentionUntil = nowMs + legacyRetentionMs; + const migrationCompletedAt = normalizeTimestamp( + await this.getMeta("migrationCompletedAt", 0), + 0, + ); + if (migrationCompletedAt > 0) { + return { + migrated: false, + skipped: true, + reason: "migration-already-completed", + revision: await this.getRevision(), + imported: { + nodes: (await this.listNodes()).length, + edges: (await this.listEdges()).length, + tombstones: (await this.listTombstones()).length, + }, + migrationCompletedAt, + migrationSource, + legacyRetentionUntil: normalizeTimestamp( + await this.getMeta("legacyRetentionUntil", 0), + 0, + ), + }; + } + const emptyStatus = await this.isEmpty(); + if (!emptyStatus?.empty) { + return { + migrated: false, + skipped: true, + reason: "local-store-not-empty", + revision: await this.getRevision(), + imported: { + nodes: emptyStatus.nodes, + edges: emptyStatus.edges, + tombstones: emptyStatus.tombstones, + }, + migrationCompletedAt: 0, + migrationSource, + legacyRetentionUntil, + }; + } + const runtimeLegacyGraph = normalizeGraphRuntimeState( + deserializeGraph(toPlainData(legacyGraph, createEmptyGraph())), + this.chatId, + ); + const snapshot = buildSnapshotFromGraph(runtimeLegacyGraph, { + chatId: this.chatId, + nowMs, + revision: normalizeRevision( + options.revision ?? runtimeLegacyGraph?.__stBmePersistence?.revision, + ), + meta: { + migrationCompletedAt: nowMs, + migrationSource, + legacyRetentionUntil, + storagePrimary: OPFS_STORE_KIND, + storageMode: this.storeMode, + }, + }); + const nodeSourceFloorById = new Map(); + const nodes = sanitizeSnapshotRecordArray(snapshot.nodes).map((node) => { + const sourceFloor = deriveNodeSourceFloor(node); + nodeSourceFloorById.set(node.id, sourceFloor); + return sourceFloor == null ? node : { ...node, sourceFloor }; + }); + const edges = sanitizeSnapshotRecordArray(snapshot.edges).map((edge) => { + const sourceFloor = deriveEdgeSourceFloor(edge, nodeSourceFloorById); + return sourceFloor == null ? edge : { ...edge, sourceFloor }; + }); + const importResult = await this.importSnapshot({ + ...snapshot, + nodes, + edges, + tombstones: sanitizeSnapshotRecordArray(snapshot.tombstones), + }, { + mode: "replace", + preserveRevision: true, + revision: normalizeRevision(options.revision ?? snapshot.meta?.revision), + markSyncDirty: true, + }); + return { + migrated: true, + skipped: false, + reason: "migrated", + revision: importResult.revision, + imported: toPlainData(importResult.imported, importResult.imported), + migrationCompletedAt: nowMs, + migrationSource, + legacyRetentionUntil, + }; + } + + async exportSnapshot() { + const snapshot = await this._loadSnapshot(); + return { + meta: toPlainData(snapshot.meta, {}), + nodes: toPlainData(snapshot.nodes, []), + edges: toPlainData(snapshot.edges, []), + tombstones: toPlainData(snapshot.tombstones, []), + state: toPlainData(snapshot.state, {}), + }; + } + + async importSnapshot(snapshot, options = {}) { + return await this._runSerializedWrite("importSnapshot", async () => { + await this._ensureV2Ready({ awaitWrites: false }); + const normalizedSnapshot = sanitizeSnapshot(snapshot); + const mode = normalizeMode(options.mode); + const shouldMarkSyncDirty = options.markSyncDirty !== false; + const nowMs = Date.now(); + const currentSnapshot = + mode === "replace" ? null : await this._loadSnapshot({ awaitWrites: false }); + const nextSnapshot = + mode === "replace" + ? normalizedSnapshot + : { + meta: { + ...(currentSnapshot?.meta || {}), + ...normalizedSnapshot.meta, + }, + state: { + ...(currentSnapshot?.state || {}), + ...normalizedSnapshot.state, + }, + nodes: mergeSnapshotRecords( + currentSnapshot?.nodes || [], + normalizedSnapshot.nodes, + ), + edges: mergeSnapshotRecords( + currentSnapshot?.edges || [], + normalizedSnapshot.edges, + ), + tombstones: mergeSnapshotRecords( + currentSnapshot?.tombstones || [], + normalizedSnapshot.tombstones, + ), + }; + const currentRevision = normalizeRevision(currentSnapshot?.meta?.revision); + const incomingRevision = normalizeRevision(normalizedSnapshot.meta?.revision); + const explicitRevision = normalizeRevision(options.revision); + const requestedRevision = Number.isFinite(Number(options.revision)) + ? explicitRevision + : options.preserveRevision + ? incomingRevision + : currentRevision + 1; + const nextRevision = Math.max(currentRevision + 1, requestedRevision); + nextSnapshot.meta = { + ...nextSnapshot.meta, + chatId: this.chatId, + revision: nextRevision, + lastModified: nowMs, + lastMutationReason: "importSnapshot", + syncDirty: shouldMarkSyncDirty, + syncDirtyReason: shouldMarkSyncDirty ? "importSnapshot" : "", + storagePrimary: OPFS_STORE_KIND, + storageMode: this.storeMode, + schemaVersion: BME_DB_SCHEMA_VERSION, + }; + nextSnapshot.state = normalizeSnapshotState(nextSnapshot); + nextSnapshot.meta.lastProcessedFloor = nextSnapshot.state.lastProcessedFloor; + nextSnapshot.meta.extractionCount = nextSnapshot.state.extractionCount; + nextSnapshot.meta.nodeCount = nextSnapshot.nodes.length; + nextSnapshot.meta.edgeCount = nextSnapshot.edges.length; + nextSnapshot.meta.tombstoneCount = nextSnapshot.tombstones.length; + await this._rewriteBaseFromSnapshot(nextSnapshot, { + headRevision: nextRevision, + reason: "importSnapshot", + }); + return { + mode, + revision: nextRevision, + imported: { + nodes: nextSnapshot.nodes.length, + edges: nextSnapshot.edges.length, + tombstones: nextSnapshot.tombstones.length, + }, + }; + }); + } + + async clearAll() { + return await this._runSerializedWrite("clearAll", async () => { + const manifest = await this._ensureV2Ready({ awaitWrites: false }); + const nextRevision = + normalizeRevision(manifest?.headRevision || manifest?.meta?.revision) + 1; + await this._rewriteBaseFromSnapshot( + { + meta: { + revision: nextRevision, + lastModified: Date.now(), + lastMutationReason: "clearAll", + syncDirty: true, + syncDirtyReason: "clearAll", + storagePrimary: OPFS_STORE_KIND, + storageMode: this.storeMode, + }, + state: { + lastProcessedFloor: META_DEFAULT_LAST_PROCESSED_FLOOR, + extractionCount: META_DEFAULT_EXTRACTION_COUNT, + }, + nodes: [], + edges: [], + tombstones: [], + }, + { + headRevision: nextRevision, + reason: "clearAll", + }, + ); + return { + cleared: true, + revision: nextRevision, + }; + }); + } + + async pruneExpiredTombstones(nowMs = Date.now()) { + const normalizedNow = normalizeTimestamp(nowMs, Date.now()); + const cutoffMs = normalizedNow - BME_TOMBSTONE_RETENTION_MS; + const snapshot = await this._loadSnapshot(); + const nextTombstones = snapshot.tombstones.filter( + (item) => normalizeTimestamp(item?.deletedAt, 0) >= cutoffMs, + ); + const removedCount = snapshot.tombstones.length - nextTombstones.length; + if (removedCount <= 0) { + return { + pruned: 0, + revision: normalizeRevision(snapshot.meta?.revision), + cutoffMs, + }; + } + const nextRevision = normalizeRevision(snapshot.meta?.revision) + 1; + await this.importSnapshot( + { + meta: { + ...snapshot.meta, + revision: nextRevision, + lastModified: normalizedNow, + lastMutationReason: "pruneExpiredTombstones", + syncDirty: true, + syncDirtyReason: "pruneExpiredTombstones", + }, + state: snapshot.state, + nodes: snapshot.nodes, + edges: snapshot.edges, + tombstones: nextTombstones, + }, + { + mode: "replace", + preserveRevision: true, + revision: nextRevision, + markSyncDirty: true, + }, + ); + return { + pruned: removedCount, + revision: nextRevision, + cutoffMs, + }; + } + + async compactNow({ force = false, reason = "manual-compaction" } = {}) { + return await this._runSerializedWrite("compactNow", async () => { + const manifest = await this._ensureV2Ready({ awaitWrites: false }); + const walCount = normalizeNonNegativeInteger(manifest?.wal?.count, 0); + const walBytes = normalizeNonNegativeInteger(manifest?.wal?.totalBytes, 0); + if ( + !force && + walCount < OPFS_V2_WAL_COMPACTION_THRESHOLD && + walBytes < OPFS_V2_WAL_BYTES_THRESHOLD + ) { + return { + compacted: false, + skipped: true, + reason: "below-threshold", + revision: normalizeRevision(manifest?.headRevision || manifest?.meta?.revision), + }; + } + const snapshot = await this._loadSnapshot({ awaitWrites: false }); + const headRevision = normalizeRevision(snapshot.meta?.revision); + await this._rewriteBaseFromSnapshot(snapshot, { + headRevision, + reason, + }); + return { + compacted: true, + skipped: false, + reason, + revision: headRevision, + }; + }); + } + + async _getChatDirectory() { + if (!this._chatDirectoryPromise) { + this._chatDirectoryPromise = (async () => { + const rootDirectory = await this._rootDirectoryFactory(); + if (!rootDirectory || typeof rootDirectory.getDirectoryHandle !== "function") { + throw new Error("OPFS 根目录不可用"); + } + const opfsRoot = await ensureDirectoryHandle( + rootDirectory, + OPFS_ROOT_DIRECTORY_NAME, + ); + const chatsDirectory = await ensureDirectoryHandle( + opfsRoot, + OPFS_CHATS_DIRECTORY_NAME, + ); + return await ensureDirectoryHandle( + chatsDirectory, + buildChatDirectoryName(this.chatId), + ); + })(); + } + return await this._chatDirectoryPromise; + } + + async _getMetaDirectory() { + return await ensureDirectoryHandle( + await this._getChatDirectory(), + OPFS_V2_META_DIRECTORY, + ); + } + + async _getShardDirectory(kind = "nodes") { + return await ensureDirectoryHandle( + await ensureDirectoryHandle( + await this._getChatDirectory(), + OPFS_V2_SHARDS_DIRECTORY, + ), + String(kind || "nodes"), + ); + } + + async _getWalDirectory() { + return await ensureDirectoryHandle( + await this._getChatDirectory(), + OPFS_V2_WAL_DIRECTORY, + ); + } + + async _readRawManifest({ awaitWrites = true } = {}) { + if (awaitWrites) { + await this._awaitPendingWrites(); + } + if (this._manifestCache) { + return this._manifestCache; + } + const chatDirectory = await this._getChatDirectory(); + const manifest = await readJsonFile(chatDirectory, OPFS_MANIFEST_FILENAME, null); + if (!manifest || typeof manifest !== "object" || Array.isArray(manifest)) { + return null; + } + this._manifestCache = manifest; + return manifest; + } + + async _writeManifest(manifest = {}) { + const chatDirectory = await this._getChatDirectory(); + const nextManifest = { + ...manifest, + version: OPFS_MANIFEST_VERSION, + formatVersion: OPFS_FORMAT_VERSION_V2, + chatId: this.chatId, + storeKind: OPFS_STORE_KIND, + storeMode: this.storeMode, + shardLayout: { + nodes: OPFS_V2_NODE_BUCKET_COUNT, + edges: OPFS_V2_EDGE_BUCKET_COUNT, + tombstones: OPFS_V2_TOMBSTONE_BUCKET_COUNT, + ...(manifest?.shardLayout || {}), + }, + meta: { + ...createDefaultMetaValues(this.chatId), + ...(manifest?.meta || {}), + chatId: this.chatId, + schemaVersion: BME_DB_SCHEMA_VERSION, + storagePrimary: OPFS_STORE_KIND, + storageMode: this.storeMode, + }, + }; + await writeJsonFile(chatDirectory, OPFS_MANIFEST_FILENAME, nextManifest); + this._manifestCache = nextManifest; + return nextManifest; + } + + async _ensureV2Ready({ awaitWrites = true } = {}) { + const manifest = await this._readRawManifest({ awaitWrites }); + if (isOpfsV2Manifest(manifest)) { + return await this._writeManifest(manifest); + } + return await this._runSerializedWrite("ensureV2Ready", async () => { + const latestManifest = await this._readRawManifest({ awaitWrites: false }); + if (isOpfsV2Manifest(latestManifest)) { + return latestManifest; + } + const legacySnapshot = await this._tryReadLegacySnapshot(latestManifest); + if (legacySnapshot) { + await this._rewriteBaseFromSnapshot(legacySnapshot, { + headRevision: normalizeRevision(legacySnapshot.meta?.revision), + reason: "legacy-promote", + }); + return this._manifestCache; + } + const emptySnapshot = { + meta: createDefaultMetaValues(this.chatId), + state: { + lastProcessedFloor: META_DEFAULT_LAST_PROCESSED_FLOOR, + extractionCount: META_DEFAULT_EXTRACTION_COUNT, + }, + nodes: [], + edges: [], + tombstones: [], + }; + await this._rewriteBaseFromSnapshot(emptySnapshot, { + headRevision: 0, + reason: "bootstrap", + }); + return this._manifestCache; + }); + } + + async _tryReadLegacySnapshot(rawManifest = null) { + const chatDirectory = await this._getChatDirectory(); + const manifest = + rawManifest && typeof rawManifest === "object" && !Array.isArray(rawManifest) + ? rawManifest + : null; + const fileNames = await listDirectoryFileNames(chatDirectory); + const coreCandidates = fileNames + .map((name) => parseSnapshotFilenameCandidate(name, OPFS_CORE_FILENAME_PREFIX)) + .filter(Boolean); + const auxCandidates = fileNames + .map((name) => parseSnapshotFilenameCandidate(name, OPFS_AUX_FILENAME_PREFIX)) + .filter(Boolean); + let coreFilename = String(manifest?.activeCoreFilename || ""); + let auxFilename = String(manifest?.activeAuxFilename || ""); + if (!coreFilename || !auxFilename) { + const coreByRevision = new Map(); + const auxByRevision = new Map(); + for (const candidate of coreCandidates) { + const current = coreByRevision.get(candidate.revision) || null; + if (!current || candidate.stampMs > current.stampMs) { + coreByRevision.set(candidate.revision, candidate); + } + } + for (const candidate of auxCandidates) { + const current = auxByRevision.get(candidate.revision) || null; + if (!current || candidate.stampMs > current.stampMs) { + auxByRevision.set(candidate.revision, candidate); + } + } + const candidateRevisions = Array.from(coreByRevision.keys()) + .filter((revision) => auxByRevision.has(revision)) + .sort((left, right) => right - left); + if (!candidateRevisions.length) { + return null; + } + const latestRevision = candidateRevisions[0]; + coreFilename = coreByRevision.get(latestRevision)?.filename || ""; + auxFilename = auxByRevision.get(latestRevision)?.filename || ""; + } + if (!coreFilename || !auxFilename) { + return null; + } + const corePayload = await readJsonFile(chatDirectory, coreFilename, null); + const auxPayload = await readJsonFile(chatDirectory, auxFilename, null); + if (!corePayload || !auxPayload) { + return null; + } + const legacyManifest = { + version: Number.isFinite(Number(manifest?.version)) + ? Number(manifest.version) + : OPFS_MANIFEST_VERSION, + chatId: this.chatId, + storeKind: OPFS_STORE_KIND, + storeMode: this.storeMode, + activeCoreFilename: coreFilename, + activeAuxFilename: auxFilename, + meta: + manifest?.meta && typeof manifest.meta === "object" && !Array.isArray(manifest.meta) + ? { + ...createDefaultMetaValues(this.chatId), + ...toPlainData(manifest.meta, {}), + } + : createDefaultMetaValues(this.chatId), + }; + return buildSnapshotFromStoredParts(legacyManifest, corePayload, auxPayload); + } + + async _readRuntimeMetaEntries() { + const metaDirectory = await this._getMetaDirectory(); + const fileNames = await listDirectoryFileNames(metaDirectory); + const output = {}; + for (const fileName of fileNames) { + const key = parseOpfsV2MetaFilename(fileName); + if (!key) continue; + const value = await readJsonFile(metaDirectory, fileName, null); + if (value === null && !OPFS_MANIFEST_META_KEYS.has(key)) continue; + output[key] = value; + } + return output; + } + + async _writeRuntimeMetaEntries(record = {}) { + const metaDirectory = await this._getMetaDirectory(); + const desiredKeys = new Set(); + for (const [rawKey, value] of Object.entries(record || {})) { + const key = normalizeRecordId(rawKey); + if (!key || OPFS_MANIFEST_META_KEYS.has(key)) continue; + desiredKeys.add(key); + await writeJsonFile(metaDirectory, buildOpfsV2MetaFilename(key), value); + } + return desiredKeys; + } + + async _rewriteBaseFromSnapshot(snapshot = {}, { headRevision = 0, reason = "rewrite-base" } = {}) { + const normalizedSnapshot = sanitizeSnapshot(snapshot); + const nowMs = Date.now(); + const nextRevision = normalizeRevision( + headRevision || normalizedSnapshot.meta?.revision, + ); + normalizedSnapshot.state = normalizeSnapshotState(normalizedSnapshot); + normalizedSnapshot.meta = { + ...createDefaultMetaValues(this.chatId, nowMs), + ...toPlainData(normalizedSnapshot.meta, {}), + chatId: this.chatId, + revision: nextRevision, + lastModified: normalizeTimestamp(normalizedSnapshot.meta?.lastModified, nowMs), + lastMutationReason: String( + normalizedSnapshot.meta?.lastMutationReason || reason || "rewrite-base", + ), + storagePrimary: OPFS_STORE_KIND, + storageMode: this.storeMode, + schemaVersion: BME_DB_SCHEMA_VERSION, + lastProcessedFloor: normalizedSnapshot.state.lastProcessedFloor, + extractionCount: normalizedSnapshot.state.extractionCount, + nodeCount: normalizedSnapshot.nodes.length, + edgeCount: normalizedSnapshot.edges.length, + tombstoneCount: normalizedSnapshot.tombstones.length, + }; + const { manifestMeta, runtimeMeta } = splitOpfsV2SnapshotMeta(normalizedSnapshot.meta); + const nodeDirectory = await this._getShardDirectory("nodes"); + const edgeDirectory = await this._getShardDirectory("edges"); + const tombstoneDirectory = await this._getShardDirectory("tombstones"); + const walDirectory = await this._getWalDirectory(); + + const nodeBuckets = groupOpfsV2RecordsByBucket( + normalizedSnapshot.nodes, + OPFS_V2_NODE_BUCKET_COUNT, + ); + const edgeBuckets = groupOpfsV2RecordsByBucket( + normalizedSnapshot.edges, + OPFS_V2_EDGE_BUCKET_COUNT, + ); + const tombstoneBuckets = groupOpfsV2RecordsByBucket( + normalizedSnapshot.tombstones, + OPFS_V2_TOMBSTONE_BUCKET_COUNT, + ); + + for (let index = 0; index < OPFS_V2_NODE_BUCKET_COUNT; index += 1) { + await writeJsonFile( + nodeDirectory, + buildOpfsV2ShardFilename("nodes", index), + nodeBuckets.get(index) || [], + ); + } + for (let index = 0; index < OPFS_V2_EDGE_BUCKET_COUNT; index += 1) { + await writeJsonFile( + edgeDirectory, + buildOpfsV2ShardFilename("edges", index), + edgeBuckets.get(index) || [], + ); + } + for (let index = 0; index < OPFS_V2_TOMBSTONE_BUCKET_COUNT; index += 1) { + await writeJsonFile( + tombstoneDirectory, + buildOpfsV2ShardFilename("tombstones", index), + tombstoneBuckets.get(index) || [], + ); + } + + const metaDirectory = await this._getMetaDirectory(); + const existingMetaFiles = await listDirectoryFileNames(metaDirectory); + const runtimeMetaKeys = new Set(Object.keys(runtimeMeta)); + for (const fileName of existingMetaFiles) { + const key = parseOpfsV2MetaFilename(fileName); + if (!key || runtimeMetaKeys.has(key)) continue; + await deleteFileIfExists(metaDirectory, fileName).catch(() => {}); + } + await this._writeRuntimeMetaEntries(runtimeMeta); + + const walFiles = await listDirectoryFileNames(walDirectory); + for (const walFile of walFiles) { + if (!parseOpfsV2WalFilename(walFile)) continue; + await deleteFileIfExists(walDirectory, walFile).catch(() => {}); + } + + const nextManifest = createEmptyOpfsV2Manifest(this.chatId, this.storeMode); + nextManifest.baseRevision = nextRevision; + nextManifest.headRevision = nextRevision; + nextManifest.lastCompactedRevision = nextRevision; + nextManifest.pendingLogFromRevision = nextRevision + 1; + nextManifest.wal = { + count: 0, + totalBytes: 0, + }; + nextManifest.compaction = { + state: "idle", + queued: false, + lastAt: nowMs, + lastReason: String(reason || "rewrite-base"), + }; + nextManifest.meta = { + ...nextManifest.meta, + ...manifestMeta, + revision: nextRevision, + lastModified: normalizedSnapshot.meta.lastModified, + lastMutationReason: String(reason || normalizedSnapshot.meta.lastMutationReason || "rewrite-base"), + storagePrimary: OPFS_STORE_KIND, + storageMode: this.storeMode, + nodeCount: normalizedSnapshot.nodes.length, + edgeCount: normalizedSnapshot.edges.length, + tombstoneCount: normalizedSnapshot.tombstones.length, + lastProcessedFloor: normalizedSnapshot.state.lastProcessedFloor, + extractionCount: normalizedSnapshot.state.extractionCount, + integrity: toPlainData( + buildOpfsV2IntegritySummary(normalizedSnapshot), + buildOpfsV2IntegritySummary(normalizedSnapshot), + ), + }; + await this._writeManifest(nextManifest); + this._snapshotCache = { + meta: { + ...normalizedSnapshot.meta, + ...runtimeMeta, + ...nextManifest.meta, + }, + state: toPlainData(normalizedSnapshot.state, normalizedSnapshot.state), + nodes: toPlainData(normalizedSnapshot.nodes, normalizedSnapshot.nodes), + edges: toPlainData(normalizedSnapshot.edges, normalizedSnapshot.edges), + tombstones: toPlainData(normalizedSnapshot.tombstones, normalizedSnapshot.tombstones), + }; + + const chatDirectory = await this._getChatDirectory(); + const legacyFiles = await listDirectoryFileNames(chatDirectory); + for (const legacyFile of legacyFiles) { + if ( + parseSnapshotFilenameCandidate(legacyFile, OPFS_CORE_FILENAME_PREFIX) || + parseSnapshotFilenameCandidate(legacyFile, OPFS_AUX_FILENAME_PREFIX) + ) { + await deleteFileIfExists(chatDirectory, legacyFile).catch(() => {}); + } + } + } + + async _readShardRecords(kind = "nodes", bucketIndex = 0) { + const shardDirectory = await this._getShardDirectory(kind); + return sanitizeSnapshotRecordArray( + await readJsonFile( + shardDirectory, + buildOpfsV2ShardFilename(kind, bucketIndex), + [], + ), + ); + } + + async _readWalRecords(manifest = null) { + const normalizedManifest = manifest || (await this._ensureV2Ready()); + const walDirectory = await this._getWalDirectory(); + const walFiles = (await listDirectoryFileNames(walDirectory)) + .map((name) => parseOpfsV2WalFilename(name)) + .filter(Boolean) + .sort((left, right) => left.revision - right.revision); + const pendingFromRevision = normalizeRevision( + normalizedManifest?.pendingLogFromRevision, + ); + const headRevision = normalizeRevision( + normalizedManifest?.headRevision || normalizedManifest?.meta?.revision, + ); + const filtered = walFiles.filter( + (entry) => + entry.revision >= pendingFromRevision && entry.revision <= headRevision, + ); + if (filtered.length > 0) { + let expectedRevision = pendingFromRevision; + for (const entry of filtered) { + if (entry.revision !== expectedRevision) { + throw new Error("opfs-v2-wal-gap"); + } + expectedRevision += 1; + } + if (expectedRevision - 1 !== headRevision) { + throw new Error("opfs-v2-wal-tail-mismatch"); + } + } else if (pendingFromRevision <= headRevision) { + throw new Error("opfs-v2-wal-missing"); + } + const records = []; + for (const entry of filtered) { + const record = await readJsonFile(walDirectory, entry.filename, null); + if (!record) { + throw new Error("opfs-v2-wal-missing-record"); + } + records.push({ + ...record, + revision: entry.revision, + byteLength: JSON.stringify(record).length, + }); + } + return records; + } + + async _loadBaseSnapshotFromV2(manifest = null) { + const normalizedManifest = manifest || (await this._ensureV2Ready()); + const runtimeMeta = await this._readRuntimeMetaEntries(); + const nodes = []; + const edges = []; + const tombstones = []; + for (let index = 0; index < OPFS_V2_NODE_BUCKET_COUNT; index += 1) { + nodes.push(...(await this._readShardRecords("nodes", index))); + } + for (let index = 0; index < OPFS_V2_EDGE_BUCKET_COUNT; index += 1) { + edges.push(...(await this._readShardRecords("edges", index))); + } + for (let index = 0; index < OPFS_V2_TOMBSTONE_BUCKET_COUNT; index += 1) { + tombstones.push(...(await this._readShardRecords("tombstones", index))); + } + const meta = { + ...createDefaultMetaValues(this.chatId), + ...(normalizedManifest?.meta || {}), + ...runtimeMeta, + chatId: this.chatId, + schemaVersion: BME_DB_SCHEMA_VERSION, + storagePrimary: OPFS_STORE_KIND, + storageMode: this.storeMode, + nodeCount: normalizeNonNegativeInteger(normalizedManifest?.meta?.nodeCount, nodes.length), + edgeCount: normalizeNonNegativeInteger(normalizedManifest?.meta?.edgeCount, edges.length), + tombstoneCount: normalizeNonNegativeInteger( + normalizedManifest?.meta?.tombstoneCount, + tombstones.length, + ), + }; + const snapshot = { + meta, + state: normalizeSnapshotState({ + meta, + state: { + lastProcessedFloor: meta.lastProcessedFloor, + extractionCount: meta.extractionCount, + }, + }), + nodes, + edges, + tombstones, + }; + snapshot.meta.lastProcessedFloor = snapshot.state.lastProcessedFloor; + snapshot.meta.extractionCount = snapshot.state.extractionCount; + return snapshot; + } + + async _loadSnapshot({ awaitWrites = true } = {}) { + if (awaitWrites) { + await this._awaitPendingWrites(); + } + const manifest = await this._ensureV2Ready({ awaitWrites: false }); + const headRevision = normalizeRevision( + manifest?.headRevision || manifest?.meta?.revision, + ); + if (this._snapshotCache && normalizeRevision(this._snapshotCache.meta?.revision) === headRevision) { + return this._snapshotCache; + } + const snapshot = await this._loadBaseSnapshotFromV2(manifest); + const walRecords = await this._readWalRecords(manifest); + for (const walRecord of walRecords) { + const nextSnapshot = applyOpfsV2DeltaToSnapshot(snapshot, walRecord.delta, walRecord.committedAt); + nextSnapshot.meta = { + ...nextSnapshot.meta, + revision: normalizeRevision(walRecord.revision), + lastModified: normalizeTimestamp(walRecord.committedAt, Date.now()), + lastMutationReason: String(walRecord.reason || "commitDelta"), + }; + nextSnapshot.state = normalizeSnapshotState(nextSnapshot); + snapshot.meta = nextSnapshot.meta; + snapshot.state = nextSnapshot.state; + snapshot.nodes = nextSnapshot.nodes; + snapshot.edges = nextSnapshot.edges; + snapshot.tombstones = nextSnapshot.tombstones; + } + snapshot.meta = { + ...snapshot.meta, + ...(manifest?.meta || {}), + revision: headRevision, + nodeCount: normalizeNonNegativeInteger(manifest?.meta?.nodeCount, snapshot.nodes.length), + edgeCount: normalizeNonNegativeInteger(manifest?.meta?.edgeCount, snapshot.edges.length), + tombstoneCount: normalizeNonNegativeInteger( + manifest?.meta?.tombstoneCount, + snapshot.tombstones.length, + ), + storagePrimary: OPFS_STORE_KIND, + storageMode: this.storeMode, + }; + snapshot.state = normalizeSnapshotState(snapshot); + snapshot.meta.lastProcessedFloor = snapshot.state.lastProcessedFloor; + snapshot.meta.extractionCount = snapshot.state.extractionCount; + this._snapshotCache = snapshot; + return snapshot; + } + + _maybeScheduleCompaction(manifest = null, reason = "commitDelta") { + if (this._compactionScheduled) return; + const walCount = normalizeNonNegativeInteger(manifest?.wal?.count, 0); + const walBytes = normalizeNonNegativeInteger(manifest?.wal?.totalBytes, 0); + if ( + walCount < OPFS_V2_WAL_COMPACTION_THRESHOLD && + walBytes < OPFS_V2_WAL_BYTES_THRESHOLD + ) { + return; + } + this._compactionScheduled = true; + const scheduler = + typeof globalThis.queueMicrotask === "function" + ? globalThis.queueMicrotask.bind(globalThis) + : (callback) => setTimeout(callback, 0); + scheduler(() => { + this.compactNow({ + force: false, + reason: `auto:${String(reason || "commitDelta")}`, + }) + .catch(() => {}) + .finally(() => { + this._compactionScheduled = false; + }); + }); + } +} diff --git a/sync/bme-sync.js b/sync/bme-sync.js index 74be315..a8078b9 100644 --- a/sync/bme-sync.js +++ b/sync/bme-sync.js @@ -7,6 +7,10 @@ import { const BME_SYNC_FILE_PREFIX = "ST-BME_sync_"; const BME_SYNC_FILE_SUFFIX = ".json"; const BME_SYNC_FILENAME_MAX_LENGTH = 180; +const BME_REMOTE_SYNC_FORMAT_VERSION_V2 = 2; +const BME_REMOTE_SYNC_NODE_CHUNK_SIZE = 2000; +const BME_REMOTE_SYNC_EDGE_CHUNK_SIZE = 4000; +const BME_REMOTE_SYNC_TOMBSTONE_CHUNK_SIZE = 2000; const BME_BACKUP_FILE_PREFIX = "ST-BME_backup_"; const BME_BACKUP_MANIFEST_FILENAME = "ST-BME_BackupManifest.json"; const BME_BACKUP_SCHEMA_VERSION = 1; @@ -793,6 +797,95 @@ function normalizeSyncSnapshot(snapshot = {}, chatId = "") { }; } +function buildRemoteChunkFilename(baseFilename, kind, index, payload) { + const normalizedBase = String(baseFilename || "sync.json").replace(/\.json$/i, ""); + const normalizedKind = String(kind || "chunk").trim().toLowerCase() || "chunk"; + const serialized = JSON.stringify(payload); + const hash = createStableFilenameHash(`${normalizedBase}:${normalizedKind}:${serialized}`); + return `${normalizedBase}.__${normalizedKind}.${String(index).padStart(3, "0")}.${hash}.json`; +} + +function chunkArray(records = [], chunkSize = 1000) { + const normalizedRecords = Array.isArray(records) ? records : []; + const normalizedChunkSize = Math.max(1, Math.floor(Number(chunkSize) || 1)); + const chunks = []; + for (let index = 0; index < normalizedRecords.length; index += normalizedChunkSize) { + chunks.push(normalizedRecords.slice(index, index + normalizedChunkSize)); + } + return chunks; +} + +function buildRemoteSyncEnvelopeV2(snapshot = {}, chatId = "", filename = "") { + const normalizedSnapshot = normalizeSyncSnapshot(snapshot, chatId); + const runtimeMeta = toSerializableData(normalizedSnapshot.meta, {}); + const manifestMeta = { + chatId: normalizedSnapshot.meta.chatId, + revision: normalizeRevision(normalizedSnapshot.meta.revision), + lastModified: normalizeTimestamp(normalizedSnapshot.meta.lastModified, 0), + deviceId: String(normalizedSnapshot.meta.deviceId || "").trim(), + nodeCount: normalizedSnapshot.nodes.length, + edgeCount: normalizedSnapshot.edges.length, + tombstoneCount: normalizedSnapshot.tombstones.length, + schemaVersion: normalizeNonNegativeInteger(normalizedSnapshot.meta.schemaVersion, 1), + }; + const chunkSpecs = [ + ...chunkArray(normalizedSnapshot.nodes, BME_REMOTE_SYNC_NODE_CHUNK_SIZE).map( + (records, index) => ({ kind: "nodes", records, index }), + ), + ...chunkArray(normalizedSnapshot.edges, BME_REMOTE_SYNC_EDGE_CHUNK_SIZE).map( + (records, index) => ({ kind: "edges", records, index }), + ), + ...chunkArray( + normalizedSnapshot.tombstones, + BME_REMOTE_SYNC_TOMBSTONE_CHUNK_SIZE, + ).map((records, index) => ({ kind: "tombstones", records, index })), + { + kind: "runtime-meta", + records: [runtimeMeta], + index: 0, + }, + ]; + const chunks = chunkSpecs.map((chunk) => { + const payload = { + kind: chunk.kind, + index: chunk.index, + records: toSerializableData(chunk.records, []), + }; + const chunkFilename = buildRemoteChunkFilename( + filename, + chunk.kind, + chunk.index, + payload, + ); + return { + kind: chunk.kind, + index: chunk.index, + count: Array.isArray(chunk.records) ? chunk.records.length : 0, + filename: chunkFilename, + payload, + }; + }); + return { + manifest: { + kind: "st-bme-sync", + formatVersion: BME_REMOTE_SYNC_FORMAT_VERSION_V2, + chatId: normalizedSnapshot.meta.chatId, + meta: manifestMeta, + state: toSerializableData(normalizedSnapshot.state, { + lastProcessedFloor: -1, + extractionCount: 0, + }), + chunks: chunks.map((chunk) => ({ + kind: chunk.kind, + index: chunk.index, + count: chunk.count, + filename: chunk.filename, + })), + }, + chunks, + }; +} + function markBackendVectorSnapshotDirty( snapshot = {}, reason = "backend-sync-import-unverified", @@ -1791,7 +1884,19 @@ async function readRemoteSnapshot(chatId, options = {}) { try { const remotePayload = await response.json(); - const snapshot = normalizeSyncSnapshot(remotePayload, normalizedChatId); + let snapshot = null; + if (Number(remotePayload?.formatVersion || 0) === BME_REMOTE_SYNC_FORMAT_VERSION_V2) { + snapshot = await readRemoteSnapshotV2Manifest( + remotePayload, + normalizedChatId, + { + ...options, + filename, + }, + ); + } else { + snapshot = normalizeSyncSnapshot(remotePayload, normalizedChatId); + } rememberResolvedSyncFilename(normalizedChatId, filename); return { exists: true, @@ -1819,32 +1924,112 @@ async function readRemoteSnapshot(chatId, options = {}) { }; } +async function readRemoteJsonFile(filename, options = {}) { + const fetchImpl = getFetch(options); + const response = await fetchImpl( + `/user/files/${encodeURIComponent(filename)}?t=${Date.now()}`, + { + method: "GET", + cache: "no-store", + }, + ); + if (response.status === 404) { + throw new Error("remote-chunk-not-found"); + } + if (!response.ok) { + const errorText = await response.text().catch(() => response.statusText); + throw new Error(errorText || `HTTP ${response.status}`); + } + return await response.json(); +} + +async function readRemoteSnapshotV2Manifest(manifest = {}, chatId = "", options = {}) { + const normalizedChatId = normalizeChatId(chatId); + const chunks = Array.isArray(manifest?.chunks) ? manifest.chunks : []; + const nodes = []; + const edges = []; + const tombstones = []; + let runtimeMeta = {}; + + for (const chunk of chunks) { + const filename = String(chunk?.filename || "").trim(); + if (!filename) continue; + const payload = await readRemoteJsonFile(filename, options); + const records = Array.isArray(payload?.records) ? payload.records : []; + switch (String(chunk.kind || "").trim()) { + case "nodes": + nodes.push(...sanitizeSnapshotRecordArray(records)); + break; + case "edges": + edges.push(...sanitizeSnapshotRecordArray(records)); + break; + case "tombstones": + tombstones.push(...sanitizeSnapshotRecordArray(records)); + break; + case "runtime-meta": + runtimeMeta = + records[0] && typeof records[0] === "object" && !Array.isArray(records[0]) + ? toSerializableData(records[0], {}) + : {}; + break; + default: + break; + } + } + + return normalizeSyncSnapshot( + { + meta: { + ...runtimeMeta, + ...(manifest?.meta || {}), + formatVersion: BME_REMOTE_SYNC_FORMAT_VERSION_V2, + }, + nodes, + edges, + tombstones, + state: toSerializableData(manifest?.state, { + lastProcessedFloor: -1, + extractionCount: 0, + }), + }, + normalizedChatId, + ); +} + async function writeSnapshotToRemote(snapshot, chatId, options = {}) { const normalizedChatId = normalizeChatId(chatId); const normalizedSnapshot = normalizeSyncSnapshot(snapshot, normalizedChatId); const filename = await resolveSyncFilename(normalizedChatId, options); const fetchImpl = getFetch(options); - - const payload = { - meta: toSerializableData(normalizedSnapshot.meta, {}), - nodes: toSerializableData(normalizedSnapshot.nodes, []), - edges: toSerializableData(normalizedSnapshot.edges, []), - tombstones: toSerializableData(normalizedSnapshot.tombstones, []), - state: toSerializableData(normalizedSnapshot.state, { - lastProcessedFloor: -1, - extractionCount: 0, - }), + const syncEnvelope = buildRemoteSyncEnvelopeV2( + normalizedSnapshot, + normalizedChatId, + filename, + ); + const requestHeaders = { + ...getRequestHeadersSafe(options), + "Content-Type": "application/json", }; - + for (const chunk of syncEnvelope.chunks) { + const chunkResponse = await fetchImpl("/api/files/upload", { + method: "POST", + headers: requestHeaders, + body: JSON.stringify({ + name: chunk.filename, + data: encodeBase64Utf8(JSON.stringify(chunk.payload, null, 2)), + }), + }); + if (!chunkResponse.ok) { + const errorText = await chunkResponse.text().catch(() => chunkResponse.statusText); + throw new Error(errorText || `HTTP ${chunkResponse.status}`); + } + } const response = await fetchImpl("/api/files/upload", { method: "POST", - headers: { - ...getRequestHeadersSafe(options), - "Content-Type": "application/json", - }, + headers: requestHeaders, body: JSON.stringify({ name: filename, - data: encodeBase64Utf8(JSON.stringify(payload, null, 2)), + data: encodeBase64Utf8(JSON.stringify(syncEnvelope.manifest, null, 2)), }), }); @@ -1857,7 +2042,7 @@ async function writeSnapshotToRemote(snapshot, chatId, options = {}) { return { filename, path: String(uploadResult?.path || ""), - payload, + payload: syncEnvelope.manifest, }; } @@ -2292,6 +2477,7 @@ export async function upload(chatId, options = {}) { syncDirty: false, syncDirtyReason: "", lastModified: localSnapshot.meta.lastModified, + remoteSyncFormatVersion: BME_REMOTE_SYNC_FORMAT_VERSION_V2, }); return { @@ -2357,6 +2543,7 @@ export async function download(chatId, options = {}) { lastSyncedRevision: remoteRevision, syncDirty: false, syncDirtyReason: "", + remoteSyncFormatVersion: BME_REMOTE_SYNC_FORMAT_VERSION_V2, }); await invokeSyncAppliedHook(options, { @@ -2694,6 +2881,7 @@ export async function syncNow(chatId, options = {}) { syncDirtyReason: "", lastProcessedFloor: mergedSnapshot.state.lastProcessedFloor, extractionCount: mergedSnapshot.state.extractionCount, + remoteSyncFormatVersion: BME_REMOTE_SYNC_FORMAT_VERSION_V2, }); const uploadResult = await writeSnapshotToRemote(mergedSnapshot, normalizedChatId, options); @@ -2703,6 +2891,7 @@ export async function syncNow(chatId, options = {}) { lastSyncedRevision: normalizeRevision(mergedSnapshot.meta.revision), syncDirty: false, syncDirtyReason: "", + remoteSyncFormatVersion: BME_REMOTE_SYNC_FORMAT_VERSION_V2, }); await invokeSyncAppliedHook(options, { @@ -2849,6 +3038,27 @@ export async function deleteRemoteSyncFile(chatId, options = {}) { let lastNotFoundFilename = filenames[0] || ""; for (const filename of filenames) { + try { + const manifestPayload = await readRemoteJsonFile(filename, options); + if (Number(manifestPayload?.formatVersion || 0) === BME_REMOTE_SYNC_FORMAT_VERSION_V2) { + for (const chunk of Array.isArray(manifestPayload?.chunks) ? manifestPayload.chunks : []) { + const chunkFilename = String(chunk?.filename || "").trim(); + if (!chunkFilename) continue; + await fetchImpl("/api/files/delete", { + method: "POST", + headers: { + ...getRequestHeadersSafe(options), + "Content-Type": "application/json", + }, + body: JSON.stringify({ + path: `/user/files/${chunkFilename}`, + }), + }).catch(() => null); + } + } + } catch { + // best-effort chunk cleanup + } const response = await fetchImpl("/api/files/delete", { method: "POST", headers: { diff --git a/tests/indexeddb-sync.mjs b/tests/indexeddb-sync.mjs index 911390a..6c649ac 100644 --- a/tests/indexeddb-sync.mjs +++ b/tests/indexeddb-sync.mjs @@ -130,8 +130,10 @@ function createMockFetchEnvironment() { sanitizeCalls: 0, getCalls: 0, uploadCalls: 0, + uploadChunkCalls: 0, deleteCalls: 0, uploadedPayloads: [], + uploadedChunkPayloads: [], }; const fetch = async (url, options = {}) => { @@ -147,7 +149,6 @@ function createMockFetchEnvironment() { } if (url === "/api/files/upload" && method === "POST") { - logs.uploadCalls += 1; const body = JSON.parse(String(options.body || "{}")); if (!/^[A-Za-z0-9._~-]+$/.test(String(body.name || ""))) { return createJsonResponse( @@ -158,7 +159,15 @@ function createMockFetchEnvironment() { const decoded = __testOnlyDecodeBase64Utf8(body.data); const payload = JSON.parse(decoded); remoteFiles.set(body.name, payload); - logs.uploadedPayloads.push({ + const targetLog = String(body.name || "").includes(".__") + ? "uploadedChunkPayloads" + : "uploadedPayloads"; + if (targetLog === "uploadedChunkPayloads") { + logs.uploadChunkCalls += 1; + } else { + logs.uploadCalls += 1; + } + logs[targetLog].push({ name: body.name, decoded, payload, @@ -288,10 +297,13 @@ async function testUploadPayloadMetaFirstAndDebounce() { const uploadResult = await upload("chat-upload", runtime); assert.equal(uploadResult.uploaded, true); assert.equal(logs.uploadCalls, 1); + assert.equal(logs.uploadChunkCalls > 0, true); const uploadedPayload = logs.uploadedPayloads[0].payload; - assert.equal(Object.keys(uploadedPayload)[0], "meta"); + assert.equal(uploadedPayload.formatVersion, 2); assert.equal(uploadedPayload.meta.revision, 9); + assert.equal(Array.isArray(uploadedPayload.chunks), true); + assert.equal(uploadedPayload.chunks.length > 0, true); scheduleUpload("chat-upload", { ...runtime, @@ -1156,12 +1168,13 @@ async function testDeleteRemoteSyncFile() { const deleteResult = await deleteRemoteSyncFile("chat-delete", runtime); assert.equal(deleteResult.deleted, true); assert.equal(deleteResult.chatId, "chat-delete"); - assert.equal(logs.deleteCalls, 1); + assert.equal(logs.deleteCalls >= 1, true); + const deleteCallsAfterFirstDelete = logs.deleteCalls; const deleteMissingResult = await deleteRemoteSyncFile("chat-delete", runtime); assert.equal(deleteMissingResult.deleted, false); assert.equal(deleteMissingResult.reason, "not-found"); - assert.equal(logs.deleteCalls, 2); + assert.equal(logs.deleteCalls > deleteCallsAfterFirstDelete, true); } async function testDeleteRemoteSyncFileFallsBackToLegacyFilename() { diff --git a/tests/opfs-persistence.mjs b/tests/opfs-persistence.mjs index ed89b42..3be76b7 100644 --- a/tests/opfs-persistence.mjs +++ b/tests/opfs-persistence.mjs @@ -125,6 +125,15 @@ function getChatDirectory(rootDirectory, chatId) { return chatDirectory; } +function getNestedDirectory(directoryHandle, ...names) { + let current = directoryHandle; + for (const name of names) { + current = current?.directories?.get(String(name || "")) || null; + assert.ok(current, `目录必须存在: ${names.join("/")}`); + } + return current; +} + function readJsonFromDirectory(directoryHandle, filename) { assert.ok(directoryHandle.files.has(filename), `文件必须存在: ${filename}`); return JSON.parse(String(directoryHandle.files.get(filename) || "")); @@ -252,12 +261,21 @@ async function testImportExportPersistenceAndFileRotation() { ); const manifestAfterFirstImport = readJsonFromDirectory(chatDirectory, "manifest.json"); - const firstCoreFilename = manifestAfterFirstImport.activeCoreFilename; - const firstAuxFilename = manifestAfterFirstImport.activeAuxFilename; - assert.ok(firstCoreFilename.startsWith("core.snapshot.")); - assert.ok(firstAuxFilename.startsWith("aux.snapshot.")); - assert.ok(chatDirectory.files.has(firstCoreFilename)); - assert.ok(chatDirectory.files.has(firstAuxFilename)); + assert.equal(manifestAfterFirstImport.formatVersion, 2); + assert.equal(manifestAfterFirstImport.baseRevision, 4); + assert.equal(manifestAfterFirstImport.headRevision, 4); + assert.equal(manifestAfterFirstImport.wal.count, 0); + const metaDirectory = getNestedDirectory(chatDirectory, "meta"); + const shardDirectory = getNestedDirectory(chatDirectory, "shards"); + const nodeShardDirectory = getNestedDirectory(shardDirectory, "nodes"); + const edgeShardDirectory = getNestedDirectory(shardDirectory, "edges"); + const tombstoneShardDirectory = getNestedDirectory(shardDirectory, "tombstones"); + const walDirectory = getNestedDirectory(chatDirectory, "wal"); + assert.ok(metaDirectory.files.size > 0); + assert.ok(nodeShardDirectory.files.size > 0); + assert.ok(edgeShardDirectory.files.size > 0); + assert.ok(tombstoneShardDirectory.files.size > 0); + assert.equal(walDirectory.files.size, 0); const firstExportedSnapshot = await store.exportSnapshot(); assert.equal(firstExportedSnapshot.meta.revision, 4); @@ -325,15 +343,14 @@ async function testImportExportPersistenceAndFileRotation() { ); const manifestAfterSecondImport = readJsonFromDirectory(chatDirectory, "manifest.json"); - assert.notEqual(manifestAfterSecondImport.activeCoreFilename, firstCoreFilename); - assert.notEqual(manifestAfterSecondImport.activeAuxFilename, firstAuxFilename); - assert.ok(!chatDirectory.files.has(firstCoreFilename)); - assert.ok(!chatDirectory.files.has(firstAuxFilename)); - assert.deepEqual(Array.from(chatDirectory.files.keys()).sort(), [ - manifestAfterSecondImport.activeAuxFilename, - manifestAfterSecondImport.activeCoreFilename, - "manifest.json", - ].sort()); + assert.equal(manifestAfterSecondImport.formatVersion, 2); + assert.equal(manifestAfterSecondImport.baseRevision, 6); + assert.equal(manifestAfterSecondImport.headRevision, 6); + assert.equal(manifestAfterSecondImport.wal.count, 0); + const secondSnapshot = await reopenedStore.exportSnapshot(); + assert.deepEqual(secondSnapshot.nodes.map((item) => item.id), ["node-2"]); + assert.equal(secondSnapshot.edges.length, 0); + assert.equal(secondSnapshot.tombstones.length, 0); await reopenedStore.close(); } @@ -366,7 +383,7 @@ async function testImportLegacyGraphMigrationAndSkipPaths() { assert.equal(migratedSnapshot.meta.migrationSource, "chat_metadata"); assert.equal(migratedSnapshot.meta.legacyRetentionUntil, nowMs + 5000); assert.equal(migratedSnapshot.meta.storagePrimary, "opfs"); - assert.equal(migratedSnapshot.meta.storageMode, "opfs-shadow"); + assert.equal(migratedSnapshot.meta.storageMode, "opfs-primary"); assert.equal(migratedSnapshot.nodes.length, 1); assert.equal(migratedSnapshot.edges.length, 1); assert.equal(migratedSnapshot.nodes[0]?.sourceFloor, 5); @@ -495,7 +512,7 @@ async function testPruneExpiredTombstonesAndClearAll() { assert.equal(afterClearSnapshot.edges.length, 0); assert.equal(afterClearSnapshot.tombstones.length, 0); assert.equal(afterClearSnapshot.meta.storagePrimary, "opfs"); - assert.equal(afterClearSnapshot.meta.storageMode, "opfs-shadow"); + assert.equal(afterClearSnapshot.meta.storageMode, "opfs-primary"); const emptyAfterClear = await store.isEmpty({ includeTombstones: true }); assert.equal(emptyAfterClear.empty, true); diff --git a/ui/panel.js b/ui/panel.js index 238aa8c..34352db 100644 --- a/ui/panel.js +++ b/ui/panel.js @@ -2075,6 +2075,8 @@ function _refreshTaskPersistence() { ? `活跃中 · queue ${Number(opfsLock.queueDepth || 0)}` : `空闲 · queue ${Number(opfsLock.queueDepth || 0)}` : "—"; + const opfsCompactionState = String(ps.opfsCompactionState?.state || "").trim(); + const opfsCompactionLabel = opfsCompactionState || "—"; const kvs = [ ["加载状态", loadStateLabel], @@ -2084,12 +2086,18 @@ function _refreshTaskPersistence() { ["accepted by", ps.acceptedBy || "—"], ["本地缓存", cacheTierLabel], ["缓存镜像", CACHE_MIRROR_LABELS[ps.cacheMirrorState] || ps.cacheMirrorState || "—"], + ["解析本地引擎", ps.resolvedLocalStore || "—"], + ["本地格式", `v${Number(ps.localStoreFormatVersion || 0) || 1}`], + ["本地迁移", ps.localStoreMigrationState || "—"], ["版本号", ps.revision ?? "—"], ["提交标记", ps.commitMarker ? "存在(诊断锚点)" : "无"], ["诊断层", STORAGE_TIER_LABELS[ps.persistDiagnosticTier] || ps.persistDiagnosticTier || "无"], ["阻塞原因", ps.blockedReason || ps.reason || "—"], ["影子快照", ps.shadowSnapshotUsed ? "已使用" : "未使用"], ["OPFS 写锁", opfsLockLabel], + ["OPFS WAL", `${Number(ps.opfsWalDepth || 0)} 条 / ${Number(ps.opfsPendingBytes || 0)} B`], + ["OPFS 压实", opfsCompactionLabel], + ["远端同步格式", `v${Number(ps.remoteSyncFormatVersion || 0) || 1}`], ]; const kvHtml = kvs.map(([k, v]) => `