Files
ST-Bionic-Memory-Ecology/maintenance/authority-consistency.js
2026-04-28 19:35:06 +08:00

563 lines
18 KiB
JavaScript
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import { deserializeGraph } from "../graph/graph.js";
import { normalizeGraphRuntimeState } from "../runtime/runtime-state.js";
import { buildSnapshotFromGraph } from "../sync/bme-db.js";
function clonePlain(value, fallbackValue = null) {
if (value == null) return fallbackValue;
if (typeof globalThis.structuredClone === "function") {
try {
return globalThis.structuredClone(value);
} catch {
}
}
try {
return JSON.parse(JSON.stringify(value));
} catch {
return fallbackValue;
}
}
function normalizeChatId(value) {
return String(value ?? "").trim();
}
function normalizeOptionalInteger(value) {
const parsed = Number(value);
if (!Number.isFinite(parsed)) return null;
return Math.max(0, Math.floor(parsed));
}
function normalizeIssue(severity, code, message) {
return {
severity,
code: String(code || "unknown"),
message: String(message || ""),
};
}
function readNestedValue(source = null, path = []) {
let current = source;
for (const key of path) {
if (!current || typeof current !== "object" || Array.isArray(current)) {
return undefined;
}
current = current[key];
}
return current;
}
function readFirstValue(source = null, candidates = []) {
for (const candidate of candidates) {
const path = Array.isArray(candidate) ? candidate : [candidate];
const value = readNestedValue(source, path);
if (value !== undefined && value !== null && value !== "") {
return value;
}
}
return undefined;
}
function readFirstInteger(source = null, candidates = []) {
const value = readFirstValue(source, candidates);
return normalizeOptionalInteger(value);
}
function readFirstString(source = null, candidates = []) {
const value = readFirstValue(source, candidates);
return value == null ? "" : String(value || "").trim();
}
function normalizeErrorMessage(error = null) {
if (!error) return "";
return String(error?.message || error || "").trim();
}
function buildRevisionDelta(left = null, right = null) {
if (!Number.isFinite(left) || !Number.isFinite(right)) return null;
return Number(left) - Number(right);
}
function normalizeRepairAction(value = "") {
return String(value || "").trim();
}
function collectIssueCodes(audit = null) {
return new Set(
(Array.isArray(audit?.issues) ? audit.issues : [])
.map((issue) => String(issue?.code || "").trim())
.filter(Boolean),
);
}
export function buildAuthorityConsistencyRepairPlan(audit = null) {
const source = audit && typeof audit === "object" && !Array.isArray(audit) ? audit : {};
const actions = Array.isArray(source.actions)
? source.actions.map(normalizeRepairAction).filter(Boolean)
: [];
const issueCodes = collectIssueCodes(source);
const steps = [];
const addStep = (action, label, detail, codes = []) => {
const normalizedAction = normalizeRepairAction(action);
if (!normalizedAction || !actions.includes(normalizedAction)) {
return;
}
if (Array.isArray(codes) && codes.length > 0) {
const matched = codes.some((code) => issueCodes.has(String(code || "").trim()));
if (!matched) {
return;
}
}
if (steps.some((step) => step.action === normalizedAction)) {
return;
}
steps.push({
action: normalizedAction,
label: String(label || normalizedAction),
detail: String(detail || ""),
issueCodes: Array.isArray(codes) ? codes.map((code) => String(code || "").trim()).filter(Boolean) : [],
});
};
addStep(
"write-authority-checkpoint",
"写入当前 Checkpoint",
"Authority Blob 尚无 checkpoint先把当前 runtime 图谱写成 checkpoint再继续后续修复。",
["blob-checkpoint-missing"],
);
addStep(
"restore-from-authority-blob-checkpoint",
"从 Blob Checkpoint 恢复 SQL",
"检测到 runtime / SQL / Blob revision 漂移,可用 Blob checkpoint 回灌 Authority SQL。",
["sql-runtime-revision-drift", "blob-runtime-revision-drift"],
);
addStep(
"rebuild-authority-trivium",
"重建 Authority Trivium",
"Trivium 与 SQL revision 不一致,或当前向量索引仍为 dirty需要重建 Trivium。",
["trivium-sql-revision-drift", "trivium-collection-mismatch", "vector-dirty"],
);
const blockedIssueCodes = (Array.isArray(source.issues) ? source.issues : [])
.filter((issue) => String(issue?.severity || "") === "error")
.map((issue) => String(issue?.code || "").trim())
.filter(Boolean);
const unsupportedActions = actions.filter(
(action) => action !== "run-authority-consistency-audit" && !steps.some((step) => step.action === action),
);
const detail = steps.length
? `建议顺序:${steps.map((step) => step.label).join(" → ")}`
: String(source?.summary?.detail || "当前审计未发现需要自动编排的修复步骤");
return {
ok: steps.length > 0,
steps,
stepCount: steps.length,
requiresConfirmation: steps.some((step) => step.action === "restore-from-authority-blob-checkpoint"),
blockedIssueCodes,
unsupportedActions,
summary: {
level: steps.length > 0 ? "warning" : String(source?.summary?.level || "idle"),
label: steps.length > 0 ? `建议修复 ${steps.length}` : "当前无需编排修复",
detail,
},
};
}
export function buildAuthorityCheckpointImportSnapshot(checkpoint = null, options = {}) {
const normalizedCheckpoint =
checkpoint && typeof checkpoint === "object" && !Array.isArray(checkpoint)
? checkpoint
: null;
if (!normalizedCheckpoint) {
return {
ok: false,
reason: "checkpoint-missing",
snapshot: null,
};
}
const chatId = normalizeChatId(options.chatId || normalizedCheckpoint.chatId);
if (!chatId) {
return {
ok: false,
reason: "checkpoint-chat-id-missing",
snapshot: null,
};
}
const serializedGraph = String(normalizedCheckpoint.serializedGraph || "").trim();
const rawGraph =
normalizedCheckpoint.graph &&
typeof normalizedCheckpoint.graph === "object" &&
!Array.isArray(normalizedCheckpoint.graph)
? clonePlain(normalizedCheckpoint.graph, null)
: null;
if (!serializedGraph && !rawGraph) {
return {
ok: false,
reason: "checkpoint-serialized-graph-missing",
snapshot: null,
};
}
try {
const restoredGraph = normalizeGraphRuntimeState(
rawGraph || deserializeGraph(serializedGraph),
chatId,
);
const revision = Math.max(
0,
normalizeOptionalInteger(options.revision) ?? -1,
normalizeOptionalInteger(normalizedCheckpoint.revision) ?? -1,
);
const source = String(options.source || "authority-checkpoint-restore").trim() ||
"authority-checkpoint-restore";
const integrity = String(
normalizedCheckpoint.integrity || options.integrity || "",
).trim();
const snapshot = buildSnapshotFromGraph(restoredGraph, {
chatId,
revision,
lastModified: Date.now(),
meta: {
integrity,
storagePrimary: String(options.storagePrimary || "authority"),
storageMode: String(options.storageMode || "authority-sql-primary"),
lastMutationReason: source,
authorityCheckpointSource: source,
authorityCheckpointChatId: chatId,
authorityCheckpointRevision: revision,
authorityCheckpointPersistedAt: String(
normalizedCheckpoint.persistedAt || "",
),
authorityCheckpointPath: String(options.path || ""),
},
});
return {
ok: true,
reason: "checkpoint-import-snapshot-ready",
snapshot,
checkpoint: {
chatId,
revision,
integrity,
persistedAt: String(normalizedCheckpoint.persistedAt || ""),
hasSerializedGraph: Boolean(serializedGraph || rawGraph),
},
};
} catch (error) {
return {
ok: false,
reason: "checkpoint-invalid",
error,
snapshot: null,
};
}
}
export async function applyAuthorityCheckpointToStore(
targetStore,
checkpoint = null,
options = {},
) {
const prepared = buildAuthorityCheckpointImportSnapshot(checkpoint, options);
if (!prepared.ok || !prepared.snapshot) {
return {
...prepared,
restored: false,
};
}
if (!targetStore || typeof targetStore.importSnapshot !== "function") {
return {
...prepared,
ok: false,
reason: "target-store-import-unavailable",
restored: false,
};
}
if (typeof targetStore.open === "function") {
await targetStore.open();
}
if (typeof options.beforeImport === "function") {
await options.beforeImport(prepared.snapshot);
}
const importResult = await targetStore.importSnapshot(prepared.snapshot, {
mode: "replace",
preserveRevision: true,
revision: prepared.snapshot.meta.revision,
markSyncDirty: options.markSyncDirty === true,
});
prepared.snapshot.meta.revision = Math.max(
normalizeOptionalInteger(importResult?.revision) ?? 0,
normalizeOptionalInteger(prepared.snapshot.meta.revision) ?? 0,
);
return {
...prepared,
ok: true,
restored: true,
revision: prepared.snapshot.meta.revision,
importResult: clonePlain(importResult, importResult),
};
}
export function buildAuthorityConsistencyAudit(input = {}) {
const source = input && typeof input === "object" && !Array.isArray(input) ? input : {};
const updatedAt = String(source.updatedAt || new Date().toISOString());
const chatId = normalizeChatId(
source.chatId ||
source.runtimeGraph?.chatId ||
source.graphPersistenceState?.chatId ||
source.blobResult?.checkpoint?.chatId,
);
const collectionId = normalizeChatId(
source.collectionId ||
source.runtimeGraph?.vectorIndexState?.collectionId ||
source.runtimeGraph?.runtimeState?.vectorIndexState?.collectionId,
);
const sqlSnapshot =
source.sqlSnapshot && typeof source.sqlSnapshot === "object" && !Array.isArray(source.sqlSnapshot)
? source.sqlSnapshot
: null;
const sqlError = normalizeErrorMessage(source.sqlError);
const sql = {
available: Boolean(sqlSnapshot),
ok: Boolean(sqlSnapshot) && !sqlError,
error: sqlError,
revision: readFirstInteger(sqlSnapshot, [["meta", "revision"]]),
nodeCount: readFirstInteger(sqlSnapshot, [["meta", "nodeCount"]]) ??
(Array.isArray(sqlSnapshot?.nodes) ? sqlSnapshot.nodes.length : null),
edgeCount: readFirstInteger(sqlSnapshot, [["meta", "edgeCount"]]) ??
(Array.isArray(sqlSnapshot?.edges) ? sqlSnapshot.edges.length : null),
tombstoneCount: readFirstInteger(sqlSnapshot, [["meta", "tombstoneCount"]]) ??
(Array.isArray(sqlSnapshot?.tombstones) ? sqlSnapshot.tombstones.length : null),
lastModified: readFirstString(sqlSnapshot, [["meta", "lastModified"], ["meta", "updatedAt"]]),
};
const blobError = normalizeErrorMessage(source.blobError || source.blobResult?.error);
const blobCheckpoint =
source.blobResult?.checkpoint &&
typeof source.blobResult.checkpoint === "object" &&
!Array.isArray(source.blobResult.checkpoint)
? source.blobResult.checkpoint
: null;
const blob = {
available: source.blobResult != null,
ok: source.blobResult?.ok !== false && !blobError,
error: blobError,
exists: Boolean(source.blobResult?.exists && blobCheckpoint),
path: String(
source.blobResult?.path || source.graphPersistenceState?.authorityBlobCheckpointPath || "",
).trim(),
revision:
readFirstInteger(blobCheckpoint, [["revision"]]) ??
normalizeOptionalInteger(source.graphPersistenceState?.authorityBlobCheckpointRevision),
chatId: normalizeChatId(blobCheckpoint?.chatId),
persistedAt: readFirstString(blobCheckpoint, [["persistedAt"], ["updatedAt"]]),
hasSerializedGraph: Boolean(
String(blobCheckpoint?.serializedGraph || "").trim() ||
(blobCheckpoint?.graph && typeof blobCheckpoint.graph === "object"),
),
};
const triviumSource =
source.triviumStat && typeof source.triviumStat === "object" && !Array.isArray(source.triviumStat)
? source.triviumStat
: null;
const triviumError = normalizeErrorMessage(source.triviumError);
const trivium = {
available: Boolean(triviumSource),
ok: Boolean(triviumSource) && !triviumError,
error: triviumError,
revision: readFirstInteger(triviumSource, [
["revision"],
["graphRevision"],
["result", "revision"],
["result", "graphRevision"],
["stats", "revision"],
["meta", "revision"],
]),
itemCount: readFirstInteger(triviumSource, [
["itemCount"],
["count"],
["total"],
["vectorCount"],
["documentCount"],
["result", "itemCount"],
["result", "count"],
["result", "total"],
["stats", "itemCount"],
["stats", "count"],
]),
linkCount: readFirstInteger(triviumSource, [
["linkCount"],
["edgeCount"],
["relationCount"],
["result", "linkCount"],
["result", "edgeCount"],
["stats", "linkCount"],
]),
namespace: readFirstString(triviumSource, [
["namespace"],
["result", "namespace"],
["collectionId"],
["result", "collectionId"],
]),
};
const runtimeGraph =
source.runtimeGraph && typeof source.runtimeGraph === "object" && !Array.isArray(source.runtimeGraph)
? source.runtimeGraph
: {};
const runtimePersistence =
source.graphPersistenceState &&
typeof source.graphPersistenceState === "object" &&
!Array.isArray(source.graphPersistenceState)
? source.graphPersistenceState
: {};
const runtime = {
revision: Math.max(
normalizeOptionalInteger(runtimeGraph?.meta?.revision) ?? 0,
normalizeOptionalInteger(runtimePersistence?.revision) ?? 0,
),
nodeCount: Array.isArray(runtimeGraph?.nodes) ? runtimeGraph.nodes.length : null,
edgeCount: Array.isArray(runtimeGraph?.edges) ? runtimeGraph.edges.length : null,
collectionId,
vectorDirty: Boolean(runtimeGraph?.vectorIndexState?.dirty),
lastJobId: String(
source.lastJob?.id || runtimePersistence?.authorityLastJobId || "",
).trim(),
lastJobStatus: String(
source.lastJob?.status || runtimePersistence?.authorityLastJobStatus || "",
).trim(),
};
const drift = {
runtimeVsSqlRevision: buildRevisionDelta(runtime.revision, sql.revision),
runtimeVsBlobRevision: buildRevisionDelta(runtime.revision, blob.revision),
sqlVsBlobRevision: buildRevisionDelta(sql.revision, blob.revision),
triviumVsSqlRevision: buildRevisionDelta(trivium.revision, sql.revision),
collectionMatchesRuntime:
!trivium.namespace || !runtime.collectionId || trivium.namespace === runtime.collectionId,
checkpointRestorable:
blob.exists && blob.hasSerializedGraph && (!blob.chatId || !chatId || blob.chatId === chatId),
};
const issues = [];
if (sql.error) {
issues.push(normalizeIssue("error", "sql-probe-error", `Authority SQL 探针失败:${sql.error}`));
}
if (blob.error) {
issues.push(normalizeIssue("warning", "blob-probe-error", `Authority Blob 读取失败:${blob.error}`));
}
if (trivium.error) {
issues.push(normalizeIssue("warning", "trivium-probe-error", `Authority Trivium 探针失败:${trivium.error}`));
}
if (blob.exists && blob.chatId && chatId && blob.chatId !== chatId) {
issues.push(normalizeIssue("error", "blob-chat-mismatch", `Checkpoint chatId 不匹配:${blob.chatId}${chatId}`));
}
if (
Number.isFinite(sql.revision) &&
Number.isFinite(runtime.revision) &&
sql.revision !== runtime.revision
) {
issues.push(
normalizeIssue(
"warning",
"sql-runtime-revision-drift",
`SQL revision 与 runtime 不一致:${sql.revision}${runtime.revision}`,
),
);
}
if (
Number.isFinite(blob.revision) &&
Number.isFinite(runtime.revision) &&
blob.revision !== runtime.revision
) {
issues.push(
normalizeIssue(
"warning",
"blob-runtime-revision-drift",
`Blob checkpoint revision 与 runtime 不一致:${blob.revision}${runtime.revision}`,
),
);
}
if (
Number.isFinite(trivium.revision) &&
Number.isFinite(sql.revision) &&
trivium.revision !== sql.revision
) {
issues.push(
normalizeIssue(
"warning",
"trivium-sql-revision-drift",
`Trivium revision 与 SQL 不一致:${trivium.revision}${sql.revision}`,
),
);
}
if (!drift.collectionMatchesRuntime) {
issues.push(
normalizeIssue(
"warning",
"trivium-collection-mismatch",
`Trivium collection/namespace 与 runtime 不一致:${trivium.namespace}${runtime.collectionId}`,
),
);
}
if (runtime.vectorDirty) {
issues.push(normalizeIssue("warning", "vector-dirty", "当前向量索引仍处于 dirty 状态"));
}
if (!blob.exists && source.capability?.blobReady) {
issues.push(normalizeIssue("warning", "blob-checkpoint-missing", "Authority Blob 尚无可用 checkpoint"));
}
const actions = [];
if (drift.checkpointRestorable) actions.push("restore-from-authority-blob-checkpoint");
if (runtime.vectorDirty || (Number.isFinite(drift.triviumVsSqlRevision) && drift.triviumVsSqlRevision < 0)) {
actions.push("rebuild-authority-trivium");
}
if (!blob.exists && source.capability?.blobReady) {
actions.push("write-authority-checkpoint");
}
if (issues.some((issue) => issue.code === "sql-runtime-revision-drift" || issue.code === "blob-runtime-revision-drift")) {
actions.push("run-authority-consistency-audit");
}
const level = issues.some((issue) => issue.severity === "error")
? "error"
: issues.length
? "warning"
: sql.available || blob.available || trivium.available
? "success"
: "idle";
const label =
level === "error"
? "存在阻塞性不一致"
: level === "warning"
? "存在待处理漂移"
: level === "success"
? "Authority 工件已对齐"
: "等待审计";
const detail = issues[0]?.message || (level === "success"
? "Authority SQL / Trivium / Blob 已达到当前可观测的一致状态"
: "尚未运行审计");
return {
updatedAt,
chatId,
collectionId,
sql,
trivium,
blob,
runtime,
drift,
issues,
actions,
summary: {
level,
label,
detail,
issueCount: issues.length,
},
};
}