diff --git a/index.js b/index.js index 4de1b72..249dcba 100644 --- a/index.js +++ b/index.js @@ -3083,6 +3083,21 @@ async function exportAuthoritySqlSnapshotProbe(chatId = "", settings = getSettin } } +async function exportAuthoritySqlSnapshotForCheckpoint(chatId = "", settings = getSettings()) { + const normalizedChatId = normalizeChatIdCandidate(chatId); + if (!normalizedChatId) return null; + const db = new AuthorityGraphStore( + normalizedChatId, + buildAuthorityGraphStoreOptions(settings), + ); + try { + await db.open(); + return await db.exportSnapshot({ includeTombstones: false }); + } finally { + await db.close?.().catch(() => null); + } +} + async function readAuthorityTriviumStat({ chatId = "", collectionId = "", @@ -3346,33 +3361,68 @@ async function writeAuthorityCheckpointFromCurrentGraph(options = {}) { }; } - ensureCurrentGraphRuntimeState(); - if (!currentGraph) { - return { - success: false, - error: "Authority runtime graph unavailable", - }; + const reason = String(options.reason || "manual-authority-checkpoint"); + const authoritySqlPrimary = shouldUseAuthorityGraphStore(settings, capability); + let checkpointGraph = null; + let revision = 0; + let integrity = ""; + let checkpointSource = "runtime"; + + if (authoritySqlPrimary) { + try { + const sqlSnapshot = await exportAuthoritySqlSnapshotForCheckpoint(chatId, settings); + const sqlRevision = Number(sqlSnapshot?.meta?.revision || 0); + if (!Number.isFinite(sqlRevision) || sqlRevision <= 0) { + return { + success: false, + error: "authority-sql-checkpoint-source-empty", + }; + } + checkpointGraph = buildGraphFromSnapshot(sqlSnapshot, { chatId }); + revision = sqlRevision; + integrity = + normalizeChatIdCandidate(options.integrity) || + normalizeChatIdCandidate(sqlSnapshot?.meta?.integrity) || + getChatMetadataIntegrity(getContext()) || + graphPersistenceState.metadataIntegrity; + checkpointSource = "authority-sql"; + } catch (error) { + return { + success: false, + error: error?.message || String(error) || "authority-sql-checkpoint-source-failed", + }; + } } - const revision = Math.max( - 1, - Number(options.revision || 0), - Number(currentGraph?.meta?.revision || 0), - Number(getGraphPersistedRevision(currentGraph) || 0), - Number(graphPersistenceState.revision || 0), - ); - const integrity = - normalizeChatIdCandidate(options.integrity) || - normalizeChatIdCandidate(getGraphPersistenceMeta(currentGraph)?.integrity) || - getChatMetadataIntegrity(getContext()) || - graphPersistenceState.metadataIntegrity; - const reason = String(options.reason || "manual-authority-checkpoint"); - const checkpoint = buildLukerGraphCheckpointV2(currentGraph, { + if (!checkpointGraph) { + ensureCurrentGraphRuntimeState(); + if (!currentGraph) { + return { + success: false, + error: "Authority runtime graph unavailable", + }; + } + checkpointGraph = currentGraph; + revision = Math.max( + 1, + Number(options.revision || 0), + Number(currentGraph?.meta?.revision || 0), + Number(getGraphPersistedRevision(currentGraph) || 0), + Number(graphPersistenceState.revision || 0), + ); + integrity = + normalizeChatIdCandidate(options.integrity) || + normalizeChatIdCandidate(getGraphPersistenceMeta(currentGraph)?.integrity) || + getChatMetadataIntegrity(getContext()) || + graphPersistenceState.metadataIntegrity; + } + + const checkpoint = buildLukerGraphCheckpointV2(checkpointGraph, { revision, chatId, integrity, reason, - storageTier: "authority-sql-primary", + storageTier: authoritySqlPrimary ? "authority-sql-primary" : "runtime-checkpoint", persistedAt: updatedAt, }); if (!checkpoint) { @@ -3410,6 +3460,7 @@ async function writeAuthorityCheckpointFromCurrentGraph(options = {}) { path: writeResult.path, revision, checkpointRevision: Number(checkpoint.revision || revision || 0), + source: checkpointSource, auditSummary: auditResult?.audit?.summary || null, auditActions: auditResult?.audit?.actions || [], }, diff --git a/tests/graph-persistence.mjs b/tests/graph-persistence.mjs index d5383d2..dd82cea 100644 --- a/tests/graph-persistence.mjs +++ b/tests/graph-persistence.mjs @@ -155,6 +155,7 @@ function normalizeAuthorityVectorConfig(settings = {}, overrides = {}) { function createAuthorityBlobAdapter() { return { async writeJson(path = "", payload = null) { + globalThis.__authorityBlobWrites?.set(String(path || ""), structuredClone(payload)); return { path, payload, written: true }; }, }; @@ -351,6 +352,8 @@ async function createGraphPersistenceHarness({ } const authoritySnapshotMap = new Map(); + const authorityBlobWrites = new Map(); + globalThis.__authorityBlobWrites = authorityBlobWrites; function getAuthoritySnapshotForChat(targetChatId = "") { const normalizedChatId = String(targetChatId || ""); @@ -673,6 +676,7 @@ async function createGraphPersistenceHarness({ ), __indexedDbSnapshots: indexedDbSnapshotMap, __authoritySnapshots: authoritySnapshotMap, + __authorityBlobWrites: authorityBlobWrites, __getAuthoritySnapshotForChat: getAuthoritySnapshotForChat, __setAuthoritySnapshotForChat: setAuthoritySnapshotForChat, sessionStorage: storage, @@ -1645,6 +1649,7 @@ result = { persistExtractionBatchResult, shouldUseAuthorityJobs, shouldUseAuthorityGraphStore, + writeAuthorityCheckpointFromCurrentGraph, onRebuildLocalCacheFromLukerSidecar, saveGraphToIndexedDb, cloneGraphForPersistence, @@ -1735,6 +1740,12 @@ result = { getAuthoritySnapshotForChat(chatId) { return globalThis.__getAuthoritySnapshotForChat(chatId); }, + getAuthorityBlobWrites() { + return Array.from(globalThis.__authorityBlobWrites.entries()).map(([path, payload]) => [ + path, + structuredClone(payload), + ]); + }, }; `, ].join("\n"), @@ -4779,6 +4790,28 @@ result = { result.revision, "Authority SQL snapshot should receive the accepted persist revision", ); + harness.api.setCurrentGraph( + stampPersistedGraph( + createMeaningfulGraph(chatId, "runtime-stale-checkpoint"), + { + revision: 1, + integrity: persistenceChatId, + chatId, + reason: "runtime-stale-checkpoint", + }, + ), + ); + const checkpointResult = await harness.api.writeAuthorityCheckpointFromCurrentGraph({ + reason: "authority-sql-checkpoint-source-test", + }); + assert.equal(checkpointResult.success, true); + assert.equal(checkpointResult.result.source, "authority-sql"); + assert.equal(checkpointResult.result.checkpointRevision, result.revision); + const checkpointPayload = Array.from(globalThis.__authorityBlobWrites.entries()).at(-1)?.[1]; + assert.equal(checkpointPayload?.revision, result.revision); + const checkpointGraph = deserializeGraph(checkpointPayload?.serializedGraph || "{}"); + assert.equal(checkpointGraph.nodes[0]?.fields?.title, "事件-luker-authority-sql"); + assert.notEqual(checkpointGraph.nodes[0]?.fields?.title, "事件-runtime-stale-checkpoint"); } {