fix(authority): checkpoint from SQL source

This commit is contained in:
OpenCode
2026-05-15 16:59:47 +00:00
parent 09fc103f21
commit 83765c6254
2 changed files with 105 additions and 21 deletions

View File

@@ -3083,6 +3083,21 @@ async function exportAuthoritySqlSnapshotProbe(chatId = "", settings = getSettin
}
}
async function exportAuthoritySqlSnapshotForCheckpoint(chatId = "", settings = getSettings()) {
const normalizedChatId = normalizeChatIdCandidate(chatId);
if (!normalizedChatId) return null;
const db = new AuthorityGraphStore(
normalizedChatId,
buildAuthorityGraphStoreOptions(settings),
);
try {
await db.open();
return await db.exportSnapshot({ includeTombstones: false });
} finally {
await db.close?.().catch(() => null);
}
}
async function readAuthorityTriviumStat({
chatId = "",
collectionId = "",
@@ -3346,33 +3361,68 @@ async function writeAuthorityCheckpointFromCurrentGraph(options = {}) {
};
}
ensureCurrentGraphRuntimeState();
if (!currentGraph) {
return {
success: false,
error: "Authority runtime graph unavailable",
};
const reason = String(options.reason || "manual-authority-checkpoint");
const authoritySqlPrimary = shouldUseAuthorityGraphStore(settings, capability);
let checkpointGraph = null;
let revision = 0;
let integrity = "";
let checkpointSource = "runtime";
if (authoritySqlPrimary) {
try {
const sqlSnapshot = await exportAuthoritySqlSnapshotForCheckpoint(chatId, settings);
const sqlRevision = Number(sqlSnapshot?.meta?.revision || 0);
if (!Number.isFinite(sqlRevision) || sqlRevision <= 0) {
return {
success: false,
error: "authority-sql-checkpoint-source-empty",
};
}
checkpointGraph = buildGraphFromSnapshot(sqlSnapshot, { chatId });
revision = sqlRevision;
integrity =
normalizeChatIdCandidate(options.integrity) ||
normalizeChatIdCandidate(sqlSnapshot?.meta?.integrity) ||
getChatMetadataIntegrity(getContext()) ||
graphPersistenceState.metadataIntegrity;
checkpointSource = "authority-sql";
} catch (error) {
return {
success: false,
error: error?.message || String(error) || "authority-sql-checkpoint-source-failed",
};
}
}
const revision = Math.max(
1,
Number(options.revision || 0),
Number(currentGraph?.meta?.revision || 0),
Number(getGraphPersistedRevision(currentGraph) || 0),
Number(graphPersistenceState.revision || 0),
);
const integrity =
normalizeChatIdCandidate(options.integrity) ||
normalizeChatIdCandidate(getGraphPersistenceMeta(currentGraph)?.integrity) ||
getChatMetadataIntegrity(getContext()) ||
graphPersistenceState.metadataIntegrity;
const reason = String(options.reason || "manual-authority-checkpoint");
const checkpoint = buildLukerGraphCheckpointV2(currentGraph, {
if (!checkpointGraph) {
ensureCurrentGraphRuntimeState();
if (!currentGraph) {
return {
success: false,
error: "Authority runtime graph unavailable",
};
}
checkpointGraph = currentGraph;
revision = Math.max(
1,
Number(options.revision || 0),
Number(currentGraph?.meta?.revision || 0),
Number(getGraphPersistedRevision(currentGraph) || 0),
Number(graphPersistenceState.revision || 0),
);
integrity =
normalizeChatIdCandidate(options.integrity) ||
normalizeChatIdCandidate(getGraphPersistenceMeta(currentGraph)?.integrity) ||
getChatMetadataIntegrity(getContext()) ||
graphPersistenceState.metadataIntegrity;
}
const checkpoint = buildLukerGraphCheckpointV2(checkpointGraph, {
revision,
chatId,
integrity,
reason,
storageTier: "authority-sql-primary",
storageTier: authoritySqlPrimary ? "authority-sql-primary" : "runtime-checkpoint",
persistedAt: updatedAt,
});
if (!checkpoint) {
@@ -3410,6 +3460,7 @@ async function writeAuthorityCheckpointFromCurrentGraph(options = {}) {
path: writeResult.path,
revision,
checkpointRevision: Number(checkpoint.revision || revision || 0),
source: checkpointSource,
auditSummary: auditResult?.audit?.summary || null,
auditActions: auditResult?.audit?.actions || [],
},

View File

@@ -155,6 +155,7 @@ function normalizeAuthorityVectorConfig(settings = {}, overrides = {}) {
function createAuthorityBlobAdapter() {
return {
async writeJson(path = "", payload = null) {
globalThis.__authorityBlobWrites?.set(String(path || ""), structuredClone(payload));
return { path, payload, written: true };
},
};
@@ -351,6 +352,8 @@ async function createGraphPersistenceHarness({
}
const authoritySnapshotMap = new Map();
const authorityBlobWrites = new Map();
globalThis.__authorityBlobWrites = authorityBlobWrites;
function getAuthoritySnapshotForChat(targetChatId = "") {
const normalizedChatId = String(targetChatId || "");
@@ -673,6 +676,7 @@ async function createGraphPersistenceHarness({
),
__indexedDbSnapshots: indexedDbSnapshotMap,
__authoritySnapshots: authoritySnapshotMap,
__authorityBlobWrites: authorityBlobWrites,
__getAuthoritySnapshotForChat: getAuthoritySnapshotForChat,
__setAuthoritySnapshotForChat: setAuthoritySnapshotForChat,
sessionStorage: storage,
@@ -1645,6 +1649,7 @@ result = {
persistExtractionBatchResult,
shouldUseAuthorityJobs,
shouldUseAuthorityGraphStore,
writeAuthorityCheckpointFromCurrentGraph,
onRebuildLocalCacheFromLukerSidecar,
saveGraphToIndexedDb,
cloneGraphForPersistence,
@@ -1735,6 +1740,12 @@ result = {
getAuthoritySnapshotForChat(chatId) {
return globalThis.__getAuthoritySnapshotForChat(chatId);
},
getAuthorityBlobWrites() {
return Array.from(globalThis.__authorityBlobWrites.entries()).map(([path, payload]) => [
path,
structuredClone(payload),
]);
},
};
`,
].join("\n"),
@@ -4779,6 +4790,28 @@ result = {
result.revision,
"Authority SQL snapshot should receive the accepted persist revision",
);
harness.api.setCurrentGraph(
stampPersistedGraph(
createMeaningfulGraph(chatId, "runtime-stale-checkpoint"),
{
revision: 1,
integrity: persistenceChatId,
chatId,
reason: "runtime-stale-checkpoint",
},
),
);
const checkpointResult = await harness.api.writeAuthorityCheckpointFromCurrentGraph({
reason: "authority-sql-checkpoint-source-test",
});
assert.equal(checkpointResult.success, true);
assert.equal(checkpointResult.result.source, "authority-sql");
assert.equal(checkpointResult.result.checkpointRevision, result.revision);
const checkpointPayload = Array.from(globalThis.__authorityBlobWrites.entries()).at(-1)?.[1];
assert.equal(checkpointPayload?.revision, result.revision);
const checkpointGraph = deserializeGraph(checkpointPayload?.serializedGraph || "{}");
assert.equal(checkpointGraph.nodes[0]?.fields?.title, "事件-luker-authority-sql");
assert.notEqual(checkpointGraph.nodes[0]?.fields?.title, "事件-runtime-stale-checkpoint");
}
{