diff --git a/index.js b/index.js index 249dcba..12fd08e 100644 --- a/index.js +++ b/index.js @@ -394,6 +394,7 @@ import { applyAuthorityCheckpointToStore, buildAuthorityConsistencyRepairPlan, buildAuthorityConsistencyAudit, + isAuthorityReplicaSyncRepairAction, } from "./maintenance/authority-consistency.js"; import { createAuthorityBlobAdapter, @@ -3363,12 +3364,19 @@ async function writeAuthorityCheckpointFromCurrentGraph(options = {}) { const reason = String(options.reason || "manual-authority-checkpoint"); const authoritySqlPrimary = shouldUseAuthorityGraphStore(settings, capability); + const authoritySqlCanonical = + authoritySqlPrimary || + [ + graphPersistenceState.acceptedBy, + graphPersistenceState.acceptedStorageTier, + graphPersistenceState.primaryStorageTier, + ].some((value) => String(value || "").trim() === "authority-sql"); let checkpointGraph = null; let revision = 0; let integrity = ""; let checkpointSource = "runtime"; - if (authoritySqlPrimary) { + if (authoritySqlCanonical) { try { const sqlSnapshot = await exportAuthoritySqlSnapshotForCheckpoint(chatId, settings); const sqlRevision = Number(sqlSnapshot?.meta?.revision || 0); @@ -3422,7 +3430,7 @@ async function writeAuthorityCheckpointFromCurrentGraph(options = {}) { chatId, integrity, reason, - storageTier: authoritySqlPrimary ? "authority-sql-primary" : "runtime-checkpoint", + storageTier: authoritySqlCanonical ? "authority-sql-primary" : "runtime-checkpoint", persistedAt: updatedAt, }); if (!checkpoint) { @@ -3668,6 +3676,7 @@ async function runAuthorityConsistencyRepairPlan(options = {}) { try { const stepResults = []; let handoffRequired = false; + let nonBlockingFailureCount = 0; for (const step of plan.steps) { let stepOutcome = null; if (step.action === "write-authority-checkpoint") { @@ -3738,6 +3747,11 @@ async function runAuthorityConsistencyRepairPlan(options = {}) { const latestStep = stepResults[stepResults.length - 1]; if (!latestStep?.success) { + const canContinueAfterFailure = isAuthorityReplicaSyncRepairAction(latestStep?.action); + if (canContinueAfterFailure) { + nonBlockingFailureCount += 1; + continue; + } const failedResult = { plan, steps: stepResults, @@ -3775,23 +3789,45 @@ async function runAuthorityConsistencyRepairPlan(options = {}) { collectionId: options.collectionId, }).catch(() => null); const finishedAt = new Date().toISOString(); + const successStepCount = stepResults.filter((step) => step?.success).length; + const failedStepCount = stepResults.filter((step) => !step?.success).length; + const partialFailure = failedStepCount > 0 && successStepCount > 0; + const allFailed = failedStepCount > 0 && successStepCount === 0 && !handoffRequired; + const outcome = allFailed + ? "error" + : handoffRequired + ? partialFailure ? "warning" : "running" + : partialFailure + ? "warning" + : "success"; const repairResult = { plan, steps: stepResults, auditSummary: audit.summary || null, handoffRequired, + outcome, + partialFailure, + failedStepCount, + nonBlockingFailureCount, finalAuditSummary: finalAuditResult?.audit?.summary || null, finalAuditDrift: finalAuditResult?.audit?.drift || null, }; updateGraphPersistenceState({ - authorityRepairState: handoffRequired ? "running" : "success", + authorityRepairState: outcome, authorityRepairUpdatedAt: finishedAt, - authorityRepairError: "", + authorityRepairError: allFailed + ? stepResults.find((step) => !step?.success)?.error || "Authority 副本同步失败" + : partialFailure + ? "部分副本同步失败;已继续执行其它可独立同步步骤" + : "", authorityRepairResult: cloneRuntimeDebugValue(repairResult, null), }); refreshPanelLiveState(); return { - success: true, + success: !allFailed, + outcome, + partialFailure, + error: allFailed ? stepResults.find((step) => !step?.success)?.error || "Authority 副本同步失败" : "", plan, results: stepResults, audit: finalAuditResult?.audit || audit, diff --git a/maintenance/authority-consistency.js b/maintenance/authority-consistency.js index 9ed4a67..10eea98 100644 --- a/maintenance/authority-consistency.js +++ b/maintenance/authority-consistency.js @@ -81,6 +81,13 @@ function normalizeRepairAction(value = "") { return String(value || "").trim(); } +export function isAuthorityReplicaSyncRepairAction(action = "") { + return [ + "write-authority-checkpoint", + "rebuild-authority-trivium", + ].includes(normalizeRepairAction(action)); +} + function collectIssueCodes(audit = null) { return new Set( (Array.isArray(audit?.issues) ? audit.issues : []) @@ -577,9 +584,11 @@ export function buildAuthorityConsistencyAudit(input = {}) { const detail = issues[0]?.message || (level === "success" ? "Authority SQL / Trivium / Blob 已达到当前可观测的一致状态" : "尚未运行审计"); - const replicaLag = issues.some((issue) => [ + const backupLag = issues.some((issue) => [ "blob-checkpoint-missing", "blob-checkpoint-behind", + ].includes(issue.code)); + const searchLag = issues.some((issue) => [ "trivium-replica-behind", "vector-dirty", ].includes(issue.code)); @@ -590,7 +599,7 @@ export function buildAuthorityConsistencyAudit(input = {}) { const dataSafety = sql.ok ? runtimeAheadOfSql ? "runtime-ahead-of-sql" - : replicaLag + : (backupLag || searchLag) ? "saved-replicas-behind" : "saved" : (sql.available ? "unknown" : "unavailable"); @@ -612,8 +621,8 @@ export function buildAuthorityConsistencyAudit(input = {}) { detail, issueCount: issues.length, dataSafety, - backupRedundancy: replicaLag ? "degraded" : (blob.exists ? "ok" : "unknown"), - searchQuality: runtime.vectorDirty || drift.sqlNewerThanTrivium ? "degraded" : "ok", + backupRedundancy: backupLag ? "degraded" : (blob.exists ? "ok" : "unknown"), + searchQuality: searchLag || drift.sqlNewerThanTrivium ? "degraded" : "ok", }, }; } diff --git a/tests/authority-consistency.mjs b/tests/authority-consistency.mjs index 6a71f6e..f11f041 100644 --- a/tests/authority-consistency.mjs +++ b/tests/authority-consistency.mjs @@ -6,6 +6,7 @@ import { buildAuthorityCheckpointImportSnapshot, buildAuthorityConsistencyAudit, buildAuthorityConsistencyRepairPlan, + isAuthorityReplicaSyncRepairAction, } from "../maintenance/authority-consistency.js"; const graph = createEmptyGraph(); @@ -269,4 +270,41 @@ assert.equal(auditRuntimeAheadOfSql.summary.level, "warning"); assert.equal(auditRuntimeAheadOfSql.summary.dataSafety, "runtime-ahead-of-sql"); assert.equal(auditRuntimeAheadOfSql.actions.includes("restore-from-authority-blob-checkpoint"), false); +const auditVectorDirtyOnly = buildAuthorityConsistencyAudit({ + chatId: "chat-a", + collectionId: "st-bme::chat-a", + runtimeGraph: { + meta: { revision: 5 }, + nodes: [{ id: "node-a" }], + edges: [], + vectorIndexState: { collectionId: "st-bme::chat-a", dirty: true }, + }, + graphPersistenceState: { + chatId: "chat-a", + revision: 5, + }, + sqlSnapshot: { + meta: { revision: 5, nodeCount: 1, edgeCount: 0, tombstoneCount: 0 }, + }, + triviumStat: { + revision: 5, + namespace: "st-bme::chat-a", + }, + blobResult: { + ok: true, + exists: true, + path: "user/files/checkpoint.json", + checkpoint: { + chatId: "chat-a", + revision: 5, + serializedGraph: serializeGraph(graph), + }, + }, +}); +assert.equal(auditVectorDirtyOnly.summary.backupRedundancy, "ok"); +assert.equal(auditVectorDirtyOnly.summary.searchQuality, "degraded"); +assert.equal(isAuthorityReplicaSyncRepairAction("write-authority-checkpoint"), true); +assert.equal(isAuthorityReplicaSyncRepairAction("rebuild-authority-trivium"), true); +assert.equal(isAuthorityReplicaSyncRepairAction("restore-from-authority-blob-checkpoint"), false); + console.log("authority-consistency tests passed"); diff --git a/tests/graph-persistence.mjs b/tests/graph-persistence.mjs index dd82cea..855f60f 100644 --- a/tests/graph-persistence.mjs +++ b/tests/graph-persistence.mjs @@ -1740,6 +1740,9 @@ result = { getAuthoritySnapshotForChat(chatId) { return globalThis.__getAuthoritySnapshotForChat(chatId); }, + setAuthoritySnapshotForChat(chatId, snapshot) { + return globalThis.__setAuthoritySnapshotForChat(chatId, snapshot); + }, getAuthorityBlobWrites() { return Array.from(globalThis.__authorityBlobWrites.entries()).map(([path, payload]) => [ path, @@ -4812,6 +4815,19 @@ result = { const checkpointGraph = deserializeGraph(checkpointPayload?.serializedGraph || "{}"); assert.equal(checkpointGraph.nodes[0]?.fields?.title, "事件-luker-authority-sql"); assert.notEqual(checkpointGraph.nodes[0]?.fields?.title, "事件-runtime-stale-checkpoint"); + + harness.api.setAuthoritySnapshotForChat(persistenceChatId, null); + const writeCountBeforeFailedCheckpoint = globalThis.__authorityBlobWrites.size; + const failedCheckpointResult = await harness.api.writeAuthorityCheckpointFromCurrentGraph({ + reason: "authority-sql-checkpoint-source-missing-test", + }); + assert.equal(failedCheckpointResult.success, false); + assert.equal(failedCheckpointResult.error, "authority-sql-checkpoint-source-empty"); + assert.equal( + globalThis.__authorityBlobWrites.size, + writeCountBeforeFailedCheckpoint, + "Authority SQL canonical checkpoint must fail instead of writing stale runtime graph", + ); } { diff --git a/ui/panel.js b/ui/panel.js index 8c7d188..643b8cc 100644 --- a/ui/panel.js +++ b/ui/panel.js @@ -3157,6 +3157,8 @@ function _refreshTaskPersistence() { ? "同步完成" : authorityRepairState === "error" ? "同步失败" + : authorityRepairState === "warning" + ? "部分同步失败" : authorityRepairState === "running" ? authorityRepairResult?.handoffRequired ? "等待 Job 交接" @@ -3529,7 +3531,9 @@ function _refreshTaskPersistence() { const result = await _actionHandlers.runAuthorityConsistencyRepairPlan(); if (result?.success) { const stepCount = Number(result?.repairResult?.steps?.length || result?.results?.length || 0); - if (result?.handoffRequired || result?.repairResult?.handoffRequired) { + if (result?.partialFailure || result?.repairResult?.partialFailure || result?.outcome === "warning" || result?.repairResult?.outcome === "warning") { + toastr.warning(`Authority 副本部分同步失败;记忆图谱不受影响${stepCount > 0 ? `(${stepCount} 步)` : ""}`, "ST-BME"); + } else if (result?.handoffRequired || result?.repairResult?.handoffRequired) { toastr.success(`Authority 副本同步已交接异步 Job${stepCount > 0 ? `(${stepCount} 步)` : ""}`, "ST-BME"); } else { toastr.success(`Authority 副本同步已完成${stepCount > 0 ? `(${stepCount} 步)` : ""}`, "ST-BME");