From 3a6f577678230badd244083ca5559c8aad84b005 Mon Sep 17 00:00:00 2001 From: Youzini-afk <13153778771cx@gmail.com> Date: Tue, 28 Apr 2026 16:48:56 +0800 Subject: [PATCH] feat(authority): add checkpoint audit scaffolding --- index.js | 331 +++++++++++++++++++ maintenance/authority-consistency.js | 476 +++++++++++++++++++++++++++ tests/authority-consistency.mjs | 149 +++++++++ ui/panel.js | 132 ++++++++ ui/ui-status.js | 11 + 5 files changed, 1099 insertions(+) create mode 100644 maintenance/authority-consistency.js create mode 100644 tests/authority-consistency.mjs diff --git a/index.js b/index.js index fa3d751..7f78560 100644 --- a/index.js +++ b/index.js @@ -369,6 +369,7 @@ import { testVectorConnection, validateVectorConfig, } from "./vector/vector-index.js"; +import { createAuthorityTriviumClient } from "./vector/authority-vector-primary-adapter.js"; import { buildAuthorityJobIdempotencyKey, createAuthorityJobAdapter, @@ -376,6 +377,10 @@ import { normalizeAuthorityJobConfig, } from "./maintenance/authority-job-adapter.js"; import { trackAuthorityJobUntilTerminal } from "./maintenance/authority-job-tracker.js"; +import { + applyAuthorityCheckpointToStore, + buildAuthorityConsistencyAudit, +} from "./maintenance/authority-consistency.js"; import { createAuthorityBlobAdapter, normalizeAuthorityBlobConfig, @@ -1831,6 +1836,55 @@ function getGraphPersistenceLiveState() { authorityRecentJobsHasMore: Boolean( graphPersistenceState.authorityRecentJobsHasMore, ), + authorityBlobReady: Boolean(authorityRuntime.capability.blobReady), + authorityBlobState: String(graphPersistenceState.authorityBlobState || "idle"), + authorityLastBlobEvent: cloneRuntimeDebugValue( + graphPersistenceState.authorityLastBlobEvent, + null, + ), + authorityLastBlobAction: String(graphPersistenceState.authorityLastBlobAction || ""), + authorityLastBlobBackend: String(graphPersistenceState.authorityLastBlobBackend || ""), + authorityLastBlobPath: String(graphPersistenceState.authorityLastBlobPath || ""), + authorityLastBlobReason: String(graphPersistenceState.authorityLastBlobReason || ""), + authorityLastBlobError: String(graphPersistenceState.authorityLastBlobError || ""), + authorityLastBlobUpdatedAt: String( + graphPersistenceState.authorityLastBlobUpdatedAt || "", + ), + authorityBlobCheckpointPath: String( + graphPersistenceState.authorityBlobCheckpointPath || "", + ), + authorityBlobCheckpointRevision: Number( + graphPersistenceState.authorityBlobCheckpointRevision || 0, + ), + authorityBlobCheckpointUpdatedAt: String( + graphPersistenceState.authorityBlobCheckpointUpdatedAt || "", + ), + authorityConsistencyState: String( + graphPersistenceState.authorityConsistencyState || "idle", + ), + authorityConsistencyAudit: cloneRuntimeDebugValue( + graphPersistenceState.authorityConsistencyAudit, + null, + ), + authorityConsistencyUpdatedAt: String( + graphPersistenceState.authorityConsistencyUpdatedAt || "", + ), + authorityConsistencyError: String( + graphPersistenceState.authorityConsistencyError || "", + ), + authorityCheckpointRestoreState: String( + graphPersistenceState.authorityCheckpointRestoreState || "idle", + ), + authorityCheckpointRestoreResult: cloneRuntimeDebugValue( + graphPersistenceState.authorityCheckpointRestoreResult, + null, + ), + authorityCheckpointRestoreUpdatedAt: String( + graphPersistenceState.authorityCheckpointRestoreUpdatedAt || "", + ), + authorityCheckpointRestoreError: String( + graphPersistenceState.authorityCheckpointRestoreError || "", + ), authorityBrowserCacheMode: String( authorityRuntime.browserState.mode || "minimal", ), @@ -2325,6 +2379,11 @@ async function readAuthorityLukerCheckpointBlob(chatId = "", options = {}) { reason: exists ? "checkpoint-found" : "checkpoint-missing", revision: Number(result?.payload?.revision || 0), }); + updateGraphPersistenceState({ + authorityBlobCheckpointPath: result?.path || path, + authorityBlobCheckpointRevision: Number(result?.payload?.revision || 0), + authorityBlobCheckpointUpdatedAt: new Date().toISOString(), + }); return { ok: result?.ok !== false, exists, @@ -2371,6 +2430,264 @@ async function readLukerGraphSidecarV2WithAuthorityBlob(context = null, options }; } +async function exportAuthoritySqlSnapshotProbe(chatId = "", settings = getSettings()) { + const normalizedChatId = normalizeChatIdCandidate(chatId); + if (!normalizedChatId) return null; + const db = new AuthorityGraphStore( + normalizedChatId, + buildAuthorityGraphStoreOptions(settings), + ); + try { + await db.open(); + return await db.exportSnapshotProbe(); + } finally { + await db.close?.().catch(() => null); + } +} + +async function readAuthorityTriviumStat({ + chatId = "", + collectionId = "", + settings = getSettings(), +} = {}) { + const normalizedChatId = normalizeChatIdCandidate(chatId); + if (!normalizedChatId) return null; + const normalizedCollectionId = + normalizeChatIdCandidate(collectionId) || + buildVectorCollectionId(normalizedChatId); + const config = normalizeAuthorityVectorConfig( + settings, + buildAuthorityGraphStoreOptions(settings), + ); + const client = createAuthorityTriviumClient(config, { + fetchImpl: globalThis.fetch?.bind(globalThis), + headerProvider: + typeof getRequestHeaders === "function" ? () => getRequestHeaders() : null, + }); + return await client.stat({ + namespace: normalizedCollectionId, + collectionId: normalizedCollectionId, + chatId: normalizedChatId, + }); +} + +async function runAuthorityConsistencyAudit(options = {}) { + const settings = getSettings(); + const { capability } = getAuthorityRuntimeSnapshot(settings); + const updatedAt = new Date().toISOString(); + const chatId = normalizeChatIdCandidate( + options.chatId || getCurrentChatId() || graphPersistenceState.chatId, + ); + if (!chatId) { + return { + success: false, + error: "missing-chat-id", + }; + } + + updateGraphPersistenceState({ + authorityConsistencyState: "running", + authorityConsistencyUpdatedAt: updatedAt, + authorityConsistencyError: "", + }); + refreshPanelLiveState(); + + try { + const collectionId = + normalizeChatIdCandidate(options.collectionId) || + normalizeChatIdCandidate(currentGraph?.vectorIndexState?.collectionId) || + buildVectorCollectionId(chatId); + const [sqlProbe, triviumProbe, blobProbe] = await Promise.all([ + capability.storagePrimaryReady + ? exportAuthoritySqlSnapshotProbe(chatId, settings) + .then((value) => ({ value, error: null })) + .catch((error) => ({ value: null, error })) + : Promise.resolve({ value: null, error: null }), + capability.triviumPrimaryReady + ? readAuthorityTriviumStat({ + chatId, + collectionId, + settings, + }) + .then((value) => ({ value, error: null })) + .catch((error) => ({ value: null, error })) + : Promise.resolve({ value: null, error: null }), + capability.blobReady + ? readAuthorityLukerCheckpointBlob(chatId) + .then((value) => ({ value, error: null })) + .catch((error) => ({ value: null, error })) + : Promise.resolve({ value: null, error: null }), + ]); + const audit = buildAuthorityConsistencyAudit({ + updatedAt, + chatId, + collectionId, + capability, + runtimeGraph: currentGraph, + graphPersistenceState, + sqlSnapshot: sqlProbe.value, + sqlError: sqlProbe.error, + triviumStat: triviumProbe.value, + triviumError: triviumProbe.error, + blobResult: blobProbe.value, + blobError: blobProbe.error, + lastJob: graphPersistenceState.authorityLastJob, + }); + updateGraphPersistenceState({ + authorityConsistencyState: audit.summary.level, + authorityConsistencyAudit: cloneRuntimeDebugValue(audit, null), + authorityConsistencyUpdatedAt: updatedAt, + authorityConsistencyError: "", + }); + refreshPanelLiveState(); + return { + success: true, + audit, + }; + } catch (error) { + const message = + error?.message || String(error) || "Authority consistency audit failed"; + updateGraphPersistenceState({ + authorityConsistencyState: "error", + authorityConsistencyUpdatedAt: updatedAt, + authorityConsistencyError: message, + }); + refreshPanelLiveState(); + return { + success: false, + error: message, + }; + } +} + +async function restoreAuthorityCheckpointFromBlob(options = {}) { + const settings = getSettings(); + const { capability } = getAuthorityRuntimeSnapshot(settings); + const updatedAt = new Date().toISOString(); + const chatId = normalizeChatIdCandidate( + options.chatId || getCurrentChatId() || graphPersistenceState.chatId, + ); + if (!chatId) { + return { + success: false, + error: "missing-chat-id", + }; + } + if (!capability.storagePrimaryReady) { + updateGraphPersistenceState({ + authorityCheckpointRestoreState: "error", + authorityCheckpointRestoreUpdatedAt: updatedAt, + authorityCheckpointRestoreError: "Authority SQL unavailable", + }); + refreshPanelLiveState(); + return { + success: false, + error: "Authority SQL unavailable", + }; + } + + updateGraphPersistenceState({ + authorityCheckpointRestoreState: "running", + authorityCheckpointRestoreUpdatedAt: updatedAt, + authorityCheckpointRestoreError: "", + }); + refreshPanelLiveState(); + + let targetDb = null; + try { + const blobResult = await readAuthorityLukerCheckpointBlob(chatId); + if (!blobResult?.exists || !blobResult?.checkpoint) { + const message = blobResult?.reason || "Authority checkpoint missing"; + updateGraphPersistenceState({ + authorityCheckpointRestoreState: "error", + authorityCheckpointRestoreUpdatedAt: updatedAt, + authorityCheckpointRestoreError: message, + }); + refreshPanelLiveState(); + return { + success: false, + error: message, + }; + } + + targetDb = new AuthorityGraphStore(chatId, buildAuthorityGraphStoreOptions(settings)); + const restoreResult = await applyAuthorityCheckpointToStore( + targetDb, + blobResult.checkpoint, + { + chatId, + path: blobResult.path, + source: "authority-blob-checkpoint-restore", + storagePrimary: AUTHORITY_GRAPH_STORE_KIND, + storageMode: AUTHORITY_GRAPH_STORE_MODE, + markSyncDirty: false, + }, + ); + await targetDb.close?.().catch(() => null); + targetDb = null; + + const preferredStore = getPreferredGraphLocalStorePresentationSync(settings); + const authorityActive = + isAuthorityGraphStorePresentation(preferredStore) || + isAuthorityGraphStoreDb(currentDb); + if (authorityActive) { + await refreshCurrentChatLocalStoreBinding({ + chatId, + forceCapabilityRefresh: true, + reopenCurrentDb: true, + source: "authority-checkpoint-restore", + }); + syncGraphLoadFromLiveContext({ + source: "authority-checkpoint-restore", + force: true, + }); + } + + const auditResult = await runAuthorityConsistencyAudit({ chatId }); + const result = { + restored: restoreResult.restored === true, + revision: Number(restoreResult.revision || 0), + path: blobResult.path, + checkpointRevision: Number(blobResult?.checkpoint?.revision || 0), + reloadApplied: authorityActive, + auditSummary: auditResult?.audit?.summary || null, + auditDrift: auditResult?.audit?.drift || null, + }; + updateGraphPersistenceState({ + authorityCheckpointRestoreState: restoreResult.restored === true ? "success" : "error", + authorityCheckpointRestoreResult: cloneRuntimeDebugValue(result, null), + authorityCheckpointRestoreUpdatedAt: updatedAt, + authorityCheckpointRestoreError: + restoreResult.restored === true + ? "" + : String(restoreResult.reason || restoreResult.error || "restore-failed"), + authorityBlobCheckpointPath: blobResult.path, + authorityBlobCheckpointRevision: Number(blobResult?.checkpoint?.revision || 0), + authorityBlobCheckpointUpdatedAt: updatedAt, + }); + refreshPanelLiveState(); + return { + success: restoreResult.restored === true, + result, + }; + } catch (error) { + const message = + error?.message || String(error) || "Authority checkpoint restore failed"; + updateGraphPersistenceState({ + authorityCheckpointRestoreState: "error", + authorityCheckpointRestoreUpdatedAt: updatedAt, + authorityCheckpointRestoreError: message, + }); + refreshPanelLiveState(); + return { + success: false, + error: message, + }; + } finally { + await targetDb?.close?.().catch(() => null); + } +} + async function submitAuthorityVectorRebuildJob({ config = null, range = null, @@ -20929,6 +21246,18 @@ async function onRefreshAuthorityJobs() { }); } +async function onRunAuthorityConsistencyAudit() { + return await runAuthorityConsistencyAudit({ + reason: "panel-authority-consistency-audit", + }); +} + +async function onRestoreAuthorityCheckpoint() { + return await restoreAuthorityCheckpointFromBlob({ + reason: "panel-authority-checkpoint-restore", + }); +} + async function onReembedDirect() { return await onReembedDirectController({ getEmbeddingConfig, @@ -21508,6 +21837,8 @@ async function onCompactLukerSidecar() { rebuildVectorRange: (range) => onRebuildVectorIndex(range), requeueAuthorityJob: async (jobId) => await requeueAuthorityJob(jobId), refreshAuthorityJobs: onRefreshAuthorityJobs, + runAuthorityConsistencyAudit: onRunAuthorityConsistencyAudit, + restoreAuthorityCheckpoint: onRestoreAuthorityCheckpoint, reembedDirect: onReembedDirect, reroll: onReroll, clearGraph: onClearGraph, diff --git a/maintenance/authority-consistency.js b/maintenance/authority-consistency.js new file mode 100644 index 0000000..3f5af02 --- /dev/null +++ b/maintenance/authority-consistency.js @@ -0,0 +1,476 @@ +import { deserializeGraph } from "../graph/graph.js"; +import { normalizeGraphRuntimeState } from "../runtime/runtime-state.js"; +import { buildSnapshotFromGraph } from "../sync/bme-db.js"; + +function clonePlain(value, fallbackValue = null) { + if (value == null) return fallbackValue; + if (typeof globalThis.structuredClone === "function") { + try { + return globalThis.structuredClone(value); + } catch { + } + } + try { + return JSON.parse(JSON.stringify(value)); + } catch { + return fallbackValue; + } +} + +function normalizeChatId(value) { + return String(value ?? "").trim(); +} + +function normalizeOptionalInteger(value) { + const parsed = Number(value); + if (!Number.isFinite(parsed)) return null; + return Math.max(0, Math.floor(parsed)); +} + +function normalizeIssue(severity, code, message) { + return { + severity, + code: String(code || "unknown"), + message: String(message || ""), + }; +} + +function readNestedValue(source = null, path = []) { + let current = source; + for (const key of path) { + if (!current || typeof current !== "object" || Array.isArray(current)) { + return undefined; + } + current = current[key]; + } + return current; +} + +function readFirstValue(source = null, candidates = []) { + for (const candidate of candidates) { + const path = Array.isArray(candidate) ? candidate : [candidate]; + const value = readNestedValue(source, path); + if (value !== undefined && value !== null && value !== "") { + return value; + } + } + return undefined; +} + +function readFirstInteger(source = null, candidates = []) { + const value = readFirstValue(source, candidates); + return normalizeOptionalInteger(value); +} + +function readFirstString(source = null, candidates = []) { + const value = readFirstValue(source, candidates); + return value == null ? "" : String(value || "").trim(); +} + +function normalizeErrorMessage(error = null) { + if (!error) return ""; + return String(error?.message || error || "").trim(); +} + +function buildRevisionDelta(left = null, right = null) { + if (!Number.isFinite(left) || !Number.isFinite(right)) return null; + return Number(left) - Number(right); +} + +export function buildAuthorityCheckpointImportSnapshot(checkpoint = null, options = {}) { + const normalizedCheckpoint = + checkpoint && typeof checkpoint === "object" && !Array.isArray(checkpoint) + ? checkpoint + : null; + if (!normalizedCheckpoint) { + return { + ok: false, + reason: "checkpoint-missing", + snapshot: null, + }; + } + + const chatId = normalizeChatId(options.chatId || normalizedCheckpoint.chatId); + if (!chatId) { + return { + ok: false, + reason: "checkpoint-chat-id-missing", + snapshot: null, + }; + } + + const serializedGraph = String(normalizedCheckpoint.serializedGraph || "").trim(); + const rawGraph = + normalizedCheckpoint.graph && + typeof normalizedCheckpoint.graph === "object" && + !Array.isArray(normalizedCheckpoint.graph) + ? clonePlain(normalizedCheckpoint.graph, null) + : null; + if (!serializedGraph && !rawGraph) { + return { + ok: false, + reason: "checkpoint-serialized-graph-missing", + snapshot: null, + }; + } + + try { + const restoredGraph = normalizeGraphRuntimeState( + rawGraph || deserializeGraph(serializedGraph), + chatId, + ); + const revision = Math.max( + 0, + normalizeOptionalInteger(options.revision) ?? -1, + normalizeOptionalInteger(normalizedCheckpoint.revision) ?? -1, + ); + const source = String(options.source || "authority-checkpoint-restore").trim() || + "authority-checkpoint-restore"; + const integrity = String( + normalizedCheckpoint.integrity || options.integrity || "", + ).trim(); + const snapshot = buildSnapshotFromGraph(restoredGraph, { + chatId, + revision, + lastModified: Date.now(), + meta: { + integrity, + storagePrimary: String(options.storagePrimary || "authority"), + storageMode: String(options.storageMode || "authority-sql-primary"), + lastMutationReason: source, + authorityCheckpointSource: source, + authorityCheckpointChatId: chatId, + authorityCheckpointRevision: revision, + authorityCheckpointPersistedAt: String( + normalizedCheckpoint.persistedAt || "", + ), + authorityCheckpointPath: String(options.path || ""), + }, + }); + return { + ok: true, + reason: "checkpoint-import-snapshot-ready", + snapshot, + checkpoint: { + chatId, + revision, + integrity, + persistedAt: String(normalizedCheckpoint.persistedAt || ""), + hasSerializedGraph: Boolean(serializedGraph || rawGraph), + }, + }; + } catch (error) { + return { + ok: false, + reason: "checkpoint-invalid", + error, + snapshot: null, + }; + } +} + +export async function applyAuthorityCheckpointToStore( + targetStore, + checkpoint = null, + options = {}, +) { + const prepared = buildAuthorityCheckpointImportSnapshot(checkpoint, options); + if (!prepared.ok || !prepared.snapshot) { + return { + ...prepared, + restored: false, + }; + } + if (!targetStore || typeof targetStore.importSnapshot !== "function") { + return { + ...prepared, + ok: false, + reason: "target-store-import-unavailable", + restored: false, + }; + } + if (typeof targetStore.open === "function") { + await targetStore.open(); + } + if (typeof options.beforeImport === "function") { + await options.beforeImport(prepared.snapshot); + } + const importResult = await targetStore.importSnapshot(prepared.snapshot, { + mode: "replace", + preserveRevision: true, + revision: prepared.snapshot.meta.revision, + markSyncDirty: options.markSyncDirty === true, + }); + prepared.snapshot.meta.revision = Math.max( + normalizeOptionalInteger(importResult?.revision) ?? 0, + normalizeOptionalInteger(prepared.snapshot.meta.revision) ?? 0, + ); + return { + ...prepared, + ok: true, + restored: true, + revision: prepared.snapshot.meta.revision, + importResult: clonePlain(importResult, importResult), + }; +} + +export function buildAuthorityConsistencyAudit(input = {}) { + const source = input && typeof input === "object" && !Array.isArray(input) ? input : {}; + const updatedAt = String(source.updatedAt || new Date().toISOString()); + const chatId = normalizeChatId( + source.chatId || + source.runtimeGraph?.chatId || + source.graphPersistenceState?.chatId || + source.blobResult?.checkpoint?.chatId, + ); + const collectionId = normalizeChatId( + source.collectionId || + source.runtimeGraph?.vectorIndexState?.collectionId || + source.runtimeGraph?.runtimeState?.vectorIndexState?.collectionId, + ); + const sqlSnapshot = + source.sqlSnapshot && typeof source.sqlSnapshot === "object" && !Array.isArray(source.sqlSnapshot) + ? source.sqlSnapshot + : null; + const sqlError = normalizeErrorMessage(source.sqlError); + const sql = { + available: Boolean(sqlSnapshot), + ok: Boolean(sqlSnapshot) && !sqlError, + error: sqlError, + revision: readFirstInteger(sqlSnapshot, [["meta", "revision"]]), + nodeCount: readFirstInteger(sqlSnapshot, [["meta", "nodeCount"]]) ?? + (Array.isArray(sqlSnapshot?.nodes) ? sqlSnapshot.nodes.length : null), + edgeCount: readFirstInteger(sqlSnapshot, [["meta", "edgeCount"]]) ?? + (Array.isArray(sqlSnapshot?.edges) ? sqlSnapshot.edges.length : null), + tombstoneCount: readFirstInteger(sqlSnapshot, [["meta", "tombstoneCount"]]) ?? + (Array.isArray(sqlSnapshot?.tombstones) ? sqlSnapshot.tombstones.length : null), + lastModified: readFirstString(sqlSnapshot, [["meta", "lastModified"], ["meta", "updatedAt"]]), + }; + + const blobError = normalizeErrorMessage(source.blobError || source.blobResult?.error); + const blobCheckpoint = + source.blobResult?.checkpoint && + typeof source.blobResult.checkpoint === "object" && + !Array.isArray(source.blobResult.checkpoint) + ? source.blobResult.checkpoint + : null; + const blob = { + available: source.blobResult != null, + ok: source.blobResult?.ok !== false && !blobError, + error: blobError, + exists: Boolean(source.blobResult?.exists && blobCheckpoint), + path: String( + source.blobResult?.path || source.graphPersistenceState?.authorityBlobCheckpointPath || "", + ).trim(), + revision: + readFirstInteger(blobCheckpoint, [["revision"]]) ?? + normalizeOptionalInteger(source.graphPersistenceState?.authorityBlobCheckpointRevision), + chatId: normalizeChatId(blobCheckpoint?.chatId), + persistedAt: readFirstString(blobCheckpoint, [["persistedAt"], ["updatedAt"]]), + hasSerializedGraph: Boolean( + String(blobCheckpoint?.serializedGraph || "").trim() || + (blobCheckpoint?.graph && typeof blobCheckpoint.graph === "object"), + ), + }; + + const triviumSource = + source.triviumStat && typeof source.triviumStat === "object" && !Array.isArray(source.triviumStat) + ? source.triviumStat + : null; + const triviumError = normalizeErrorMessage(source.triviumError); + const trivium = { + available: Boolean(triviumSource), + ok: Boolean(triviumSource) && !triviumError, + error: triviumError, + revision: readFirstInteger(triviumSource, [ + ["revision"], + ["graphRevision"], + ["result", "revision"], + ["result", "graphRevision"], + ["stats", "revision"], + ["meta", "revision"], + ]), + itemCount: readFirstInteger(triviumSource, [ + ["itemCount"], + ["count"], + ["total"], + ["vectorCount"], + ["documentCount"], + ["result", "itemCount"], + ["result", "count"], + ["result", "total"], + ["stats", "itemCount"], + ["stats", "count"], + ]), + linkCount: readFirstInteger(triviumSource, [ + ["linkCount"], + ["edgeCount"], + ["relationCount"], + ["result", "linkCount"], + ["result", "edgeCount"], + ["stats", "linkCount"], + ]), + namespace: readFirstString(triviumSource, [ + ["namespace"], + ["result", "namespace"], + ["collectionId"], + ["result", "collectionId"], + ]), + }; + + const runtimeGraph = + source.runtimeGraph && typeof source.runtimeGraph === "object" && !Array.isArray(source.runtimeGraph) + ? source.runtimeGraph + : {}; + const runtimePersistence = + source.graphPersistenceState && + typeof source.graphPersistenceState === "object" && + !Array.isArray(source.graphPersistenceState) + ? source.graphPersistenceState + : {}; + const runtime = { + revision: Math.max( + normalizeOptionalInteger(runtimeGraph?.meta?.revision) ?? 0, + normalizeOptionalInteger(runtimePersistence?.revision) ?? 0, + ), + nodeCount: Array.isArray(runtimeGraph?.nodes) ? runtimeGraph.nodes.length : null, + edgeCount: Array.isArray(runtimeGraph?.edges) ? runtimeGraph.edges.length : null, + collectionId, + vectorDirty: Boolean(runtimeGraph?.vectorIndexState?.dirty), + lastJobId: String( + source.lastJob?.id || runtimePersistence?.authorityLastJobId || "", + ).trim(), + lastJobStatus: String( + source.lastJob?.status || runtimePersistence?.authorityLastJobStatus || "", + ).trim(), + }; + + const drift = { + runtimeVsSqlRevision: buildRevisionDelta(runtime.revision, sql.revision), + runtimeVsBlobRevision: buildRevisionDelta(runtime.revision, blob.revision), + sqlVsBlobRevision: buildRevisionDelta(sql.revision, blob.revision), + triviumVsSqlRevision: buildRevisionDelta(trivium.revision, sql.revision), + collectionMatchesRuntime: + !trivium.namespace || !runtime.collectionId || trivium.namespace === runtime.collectionId, + checkpointRestorable: + blob.exists && blob.hasSerializedGraph && (!blob.chatId || !chatId || blob.chatId === chatId), + }; + + const issues = []; + if (sql.error) { + issues.push(normalizeIssue("error", "sql-probe-error", `Authority SQL 探针失败:${sql.error}`)); + } + if (blob.error) { + issues.push(normalizeIssue("warning", "blob-probe-error", `Authority Blob 读取失败:${blob.error}`)); + } + if (trivium.error) { + issues.push(normalizeIssue("warning", "trivium-probe-error", `Authority Trivium 探针失败:${trivium.error}`)); + } + if (blob.exists && blob.chatId && chatId && blob.chatId !== chatId) { + issues.push(normalizeIssue("error", "blob-chat-mismatch", `Checkpoint chatId 不匹配:${blob.chatId} ≠ ${chatId}`)); + } + if ( + Number.isFinite(sql.revision) && + Number.isFinite(runtime.revision) && + sql.revision !== runtime.revision + ) { + issues.push( + normalizeIssue( + "warning", + "sql-runtime-revision-drift", + `SQL revision 与 runtime 不一致:${sql.revision} ≠ ${runtime.revision}`, + ), + ); + } + if ( + Number.isFinite(blob.revision) && + Number.isFinite(runtime.revision) && + blob.revision !== runtime.revision + ) { + issues.push( + normalizeIssue( + "warning", + "blob-runtime-revision-drift", + `Blob checkpoint revision 与 runtime 不一致:${blob.revision} ≠ ${runtime.revision}`, + ), + ); + } + if ( + Number.isFinite(trivium.revision) && + Number.isFinite(sql.revision) && + trivium.revision !== sql.revision + ) { + issues.push( + normalizeIssue( + "warning", + "trivium-sql-revision-drift", + `Trivium revision 与 SQL 不一致:${trivium.revision} ≠ ${sql.revision}`, + ), + ); + } + if (!drift.collectionMatchesRuntime) { + issues.push( + normalizeIssue( + "warning", + "trivium-collection-mismatch", + `Trivium collection/namespace 与 runtime 不一致:${trivium.namespace} ≠ ${runtime.collectionId}`, + ), + ); + } + if (runtime.vectorDirty) { + issues.push(normalizeIssue("warning", "vector-dirty", "当前向量索引仍处于 dirty 状态")); + } + if (!blob.exists && source.capability?.blobReady) { + issues.push(normalizeIssue("warning", "blob-checkpoint-missing", "Authority Blob 尚无可用 checkpoint")); + } + + const actions = []; + if (drift.checkpointRestorable) actions.push("restore-from-authority-blob-checkpoint"); + if (runtime.vectorDirty || (Number.isFinite(drift.triviumVsSqlRevision) && drift.triviumVsSqlRevision < 0)) { + actions.push("rebuild-authority-trivium"); + } + if (!blob.exists && source.capability?.blobReady) { + actions.push("write-authority-checkpoint"); + } + if (issues.some((issue) => issue.code === "sql-runtime-revision-drift" || issue.code === "blob-runtime-revision-drift")) { + actions.push("run-authority-consistency-audit"); + } + + const level = issues.some((issue) => issue.severity === "error") + ? "error" + : issues.length + ? "warning" + : sql.available || blob.available || trivium.available + ? "success" + : "idle"; + const label = + level === "error" + ? "存在阻塞性不一致" + : level === "warning" + ? "存在待处理漂移" + : level === "success" + ? "Authority 工件已对齐" + : "等待审计"; + const detail = issues[0]?.message || (level === "success" + ? "Authority SQL / Trivium / Blob 已达到当前可观测的一致状态" + : "尚未运行审计"); + + return { + updatedAt, + chatId, + collectionId, + sql, + trivium, + blob, + runtime, + drift, + issues, + actions, + summary: { + level, + label, + detail, + issueCount: issues.length, + }, + }; +} diff --git a/tests/authority-consistency.mjs b/tests/authority-consistency.mjs new file mode 100644 index 0000000..64f3569 --- /dev/null +++ b/tests/authority-consistency.mjs @@ -0,0 +1,149 @@ +import assert from "node:assert/strict"; + +import { createEmptyGraph, serializeGraph } from "../graph/graph.js"; +import { + applyAuthorityCheckpointToStore, + buildAuthorityCheckpointImportSnapshot, + buildAuthorityConsistencyAudit, +} from "../maintenance/authority-consistency.js"; + +const graph = createEmptyGraph(); +graph.chatId = "chat-a"; +graph.meta = { ...(graph.meta || {}), chatId: "chat-a", revision: 7 }; +graph.nodes.push({ + id: "node-a", + type: "memory", + seq: 1, + seqRange: [1, 1], + fields: { title: "Node A" }, + updatedAt: Date.now(), +}); + +const checkpoint = { + chatId: "chat-a", + revision: 7, + integrity: "integrity-a", + persistedAt: "2026-04-28T08:00:00.000Z", + serializedGraph: serializeGraph(graph), +}; + +const prepared = buildAuthorityCheckpointImportSnapshot(checkpoint, { + path: "user/files/checkpoint.json", + source: "authority-blob-restore", +}); +assert.equal(prepared.ok, true); +assert.equal(prepared.snapshot.meta.chatId, "chat-a"); +assert.equal(prepared.snapshot.meta.revision, 7); +assert.equal(prepared.snapshot.meta.authorityCheckpointPath, "user/files/checkpoint.json"); +assert.equal(prepared.snapshot.nodes.length, 1); + +const missingSerialized = buildAuthorityCheckpointImportSnapshot({ + chatId: "chat-a", + revision: 7, +}); +assert.equal(missingSerialized.ok, false); +assert.equal(missingSerialized.reason, "checkpoint-serialized-graph-missing"); + +const imported = []; +const restoreResult = await applyAuthorityCheckpointToStore( + { + async open() { + return true; + }, + async importSnapshot(snapshot, options) { + imported.push({ snapshot, options }); + return { revision: snapshot.meta.revision, imported: { nodes: snapshot.nodes.length } }; + }, + }, + checkpoint, + { markSyncDirty: false }, +); +assert.equal(restoreResult.ok, true); +assert.equal(restoreResult.restored, true); +assert.equal(imported.length, 1); +assert.equal(imported[0].options.mode, "replace"); +assert.equal(imported[0].options.markSyncDirty, false); + +const auditAligned = buildAuthorityConsistencyAudit({ + updatedAt: "2026-04-28T08:20:00.000Z", + chatId: "chat-a", + collectionId: "st-bme::chat-a", + capability: { + blobReady: true, + }, + runtimeGraph: { + meta: { revision: 7 }, + nodes: [{ id: "node-a" }], + edges: [], + vectorIndexState: { + collectionId: "st-bme::chat-a", + dirty: false, + }, + }, + graphPersistenceState: { + chatId: "chat-a", + revision: 7, + authorityBlobCheckpointPath: "user/files/checkpoint.json", + }, + sqlSnapshot: { + meta: { revision: 7, nodeCount: 1, edgeCount: 0, tombstoneCount: 0 }, + nodes: [], + edges: [], + tombstones: [], + }, + triviumStat: { + revision: 7, + itemCount: 1, + linkCount: 0, + namespace: "st-bme::chat-a", + }, + blobResult: { + ok: true, + exists: true, + path: "user/files/checkpoint.json", + checkpoint, + }, +}); +assert.equal(auditAligned.summary.level, "success"); +assert.equal(auditAligned.issues.length, 0); +assert.equal(auditAligned.drift.checkpointRestorable, true); +assert.ok(auditAligned.actions.includes("restore-from-authority-blob-checkpoint")); + +const auditDrift = buildAuthorityConsistencyAudit({ + chatId: "chat-a", + collectionId: "st-bme::chat-a", + capability: { + blobReady: true, + }, + runtimeGraph: { + meta: { revision: 9 }, + nodes: [], + edges: [], + vectorIndexState: { + collectionId: "st-bme::chat-a", + dirty: true, + }, + }, + graphPersistenceState: { + chatId: "chat-a", + revision: 9, + }, + sqlSnapshot: { + meta: { revision: 8, nodeCount: 1, edgeCount: 0, tombstoneCount: 0 }, + }, + triviumStat: { + revision: 7, + namespace: "st-bme::chat-a", + }, + blobResult: { + ok: true, + exists: false, + path: "user/files/checkpoint.json", + }, +}); +assert.equal(auditDrift.summary.level, "warning"); +assert.ok(auditDrift.issues.some((issue) => issue.code === "sql-runtime-revision-drift")); +assert.ok(auditDrift.issues.some((issue) => issue.code === "vector-dirty")); +assert.ok(auditDrift.actions.includes("rebuild-authority-trivium")); + +console.log("authority-consistency tests passed"); diff --git a/ui/panel.js b/ui/panel.js index 47f6438..ab9fbd8 100644 --- a/ui/panel.js +++ b/ui/panel.js @@ -3043,6 +3043,67 @@ function _refreshTaskPersistence() { const extractionCountLabel = Number.isFinite(Number(historyState?.extractionCount)) ? String(Number(historyState.extractionCount)) : "0"; + const authorityAudit = + ps.authorityConsistencyAudit && + typeof ps.authorityConsistencyAudit === "object" && + !Array.isArray(ps.authorityConsistencyAudit) + ? ps.authorityConsistencyAudit + : null; + const authorityAuditSummary = authorityAudit?.summary || { + level: String(ps.authorityConsistencyState || "idle"), + label: + ps.authorityConsistencyState === "success" + ? "Authority 工件已对齐" + : ps.authorityConsistencyState === "warning" + ? "存在待处理漂移" + : ps.authorityConsistencyState === "error" + ? "Authority 审计失败" + : ps.authorityConsistencyState === "running" + ? "Authority 审计中" + : "等待审计", + detail: String(ps.authorityConsistencyError || "尚未运行一致性审计"), + }; + const authorityAuditSqlRevision = Number.isFinite(Number(authorityAudit?.sql?.revision)) + ? String(Number(authorityAudit.sql.revision)) + : "—"; + const authorityAuditTriviumRevision = Number.isFinite(Number(authorityAudit?.trivium?.revision)) + ? String(Number(authorityAudit.trivium.revision)) + : "—"; + const authorityAuditBlobRevision = Number.isFinite(Number(authorityAudit?.blob?.revision)) + ? String(Number(authorityAudit.blob.revision)) + : Number.isFinite(Number(ps.authorityBlobCheckpointRevision)) && Number(ps.authorityBlobCheckpointRevision) > 0 + ? String(Number(ps.authorityBlobCheckpointRevision)) + : "—"; + const authorityAuditBlobPath = String( + authorityAudit?.blob?.path || ps.authorityBlobCheckpointPath || "", + ).trim() || "—"; + const authorityAuditIssuesLabel = Array.isArray(authorityAudit?.issues) && authorityAudit.issues.length + ? authorityAudit.issues.map((issue) => issue.message).filter(Boolean).join(" / ") + : authorityAuditSummary.detail || "—"; + const authorityAuditActionsLabel = Array.isArray(authorityAudit?.actions) && authorityAudit.actions.length + ? authorityAudit.actions.join(" · ") + : "—"; + const authorityAuditUpdatedLabel = ps.authorityConsistencyUpdatedAt + ? _formatTaskProfileTime(ps.authorityConsistencyUpdatedAt) + : "—"; + const authorityRestoreResult = + ps.authorityCheckpointRestoreResult && + typeof ps.authorityCheckpointRestoreResult === "object" && + !Array.isArray(ps.authorityCheckpointRestoreResult) + ? ps.authorityCheckpointRestoreResult + : null; + const authorityRestoreState = String(ps.authorityCheckpointRestoreState || "idle").trim(); + const authorityRestoreLabel = + authorityRestoreState === "success" + ? "已恢复" + : authorityRestoreState === "error" + ? "恢复失败" + : authorityRestoreState === "running" + ? "恢复中" + : "未执行"; + const authorityRestoreUpdatedLabel = ps.authorityCheckpointRestoreUpdatedAt + ? _formatTaskProfileTime(ps.authorityCheckpointRestoreUpdatedAt) + : "—"; const activeRegionLabel = String( historyState?.activeRegion || historyState?.lastExtractedRegion || @@ -3138,6 +3199,26 @@ function _refreshTaskPersistence() { ..._buildLoadDiagnosticRows(loadDiagnostics), ..._buildPersistDeltaDiagnosticRows(persistDeltaDiagnostics), ); + const authorityRows = [ + ["审计状态", authorityAuditSummary.label], + ["SQL rev", authorityAuditSqlRevision], + ["Trivium rev", authorityAuditTriviumRevision], + ["Blob rev", authorityAuditBlobRevision], + ["Blob path", authorityAuditBlobPath], + ["建议动作", authorityAuditActionsLabel], + ["最近审计", authorityAuditUpdatedLabel], + ["恢复状态", authorityRestoreLabel], + ["恢复结果", authorityRestoreResult?.revision ? `rev ${Number(authorityRestoreResult.revision)}` : "—"], + ["最近恢复", authorityRestoreUpdatedLabel], + ]; + const authorityActionButtons = [ + typeof _actionHandlers.runAuthorityConsistencyAudit === "function" + ? `` + : "", + typeof _actionHandlers.restoreAuthorityCheckpoint === "function" + ? `` + : "", + ].filter(Boolean).join(""); el.innerHTML = `
@@ -3167,7 +3248,58 @@ function _refreshTaskPersistence() { ${renderRows(runtimeRows)}
+
+
Authority 一致性 / Checkpoint
+
+ 审计当前 chat 的 Authority SQL / Trivium / Blob checkpoint 是否同 revision 前进;restore 会把 Blob checkpoint 回灌到 Authority SQL,并在 Authority 主存储启用时触发当前聊天重载。 +
+ ${authorityActionButtons ? `
${authorityActionButtons}
` : ""} + ${renderRowsTwoColumn(authorityRows)} +
${_escHtml(authorityAuditSummary.detail || "—")}
+
${_escHtml(authorityAuditIssuesLabel)}
+ ${ps.authorityCheckpointRestoreError ? `
${_escHtml(ps.authorityCheckpointRestoreError)}
` : ""} +
`; + + el + .querySelectorAll('[data-authority-persistence-action]') + .forEach((buttonEl) => buttonEl.addEventListener("click", async (event) => { + const button = event.currentTarget; + const action = String(button?.dataset?.authorityPersistenceAction || "").trim(); + if (button.disabled) return; + button.disabled = true; + try { + if (action === "audit") { + if (typeof _actionHandlers.runAuthorityConsistencyAudit !== "function") return; + toastr.info("Authority 一致性审计中…", "ST-BME", { timeOut: 2000 }); + const result = await _actionHandlers.runAuthorityConsistencyAudit(); + if (result?.success) { + toastr.success(result?.audit?.summary?.label || "Authority 审计完成", "ST-BME"); + } else { + toastr.warning(`Authority 审计失败:${result?.error || "unknown"}`, "ST-BME"); + } + } else if (action === "restore") { + if (typeof _actionHandlers.restoreAuthorityCheckpoint !== "function") return; + toastr.info("Authority Checkpoint 恢复中…", "ST-BME", { timeOut: 2000 }); + const result = await _actionHandlers.restoreAuthorityCheckpoint(); + if (result?.success) { + toastr.success(`Authority Checkpoint 已恢复:rev ${Number(result?.result?.revision || 0) || "?"}`, "ST-BME"); + } else { + toastr.warning(`Authority Checkpoint 恢复失败:${result?.error || "unknown"}`, "ST-BME"); + } + } + } catch (error) { + toastr.error( + action === "restore" + ? `Authority Checkpoint 恢复失败: ${error?.message || error}` + : `Authority 审计失败: ${error?.message || error}`, + "ST-BME", + ); + } finally { + button.disabled = false; + _refreshTaskPersistence(); + } + })); } // ==================== 图谱视图切换 ==================== diff --git a/ui/ui-status.js b/ui/ui-status.js index 8304770..318c412 100644 --- a/ui/ui-status.js +++ b/ui/ui-status.js @@ -169,6 +169,17 @@ export function createGraphPersistenceState() { authorityLastBlobReason: "", authorityLastBlobError: "", authorityLastBlobUpdatedAt: "", + authorityBlobCheckpointPath: "", + authorityBlobCheckpointRevision: 0, + authorityBlobCheckpointUpdatedAt: "", + authorityConsistencyState: "idle", + authorityConsistencyAudit: null, + authorityConsistencyUpdatedAt: "", + authorityConsistencyError: "", + authorityCheckpointRestoreState: "idle", + authorityCheckpointRestoreResult: null, + authorityCheckpointRestoreUpdatedAt: "", + authorityCheckpointRestoreError: "", authorityDiagnosticsBundlePath: "", authorityDiagnosticsBundleReason: "", authorityDiagnosticsBundleUpdatedAt: "",