From 8979b896468000155c987b83caca49acb28dd878 Mon Sep 17 00:00:00 2001 From: Youzini-afk <13153778771cx@gmail.com> Date: Tue, 28 Apr 2026 19:34:28 +0800 Subject: [PATCH] feat(authority): harden jobs repair and diagnostics --- index.js | 489 ++++++++++++++++++++ maintenance/authority-consistency.js | 86 ++++ maintenance/authority-diagnostics-bundle.js | 26 +- maintenance/authority-job-adapter.js | 78 ++++ maintenance/authority-job-tracker.js | 120 ++++- package.json | 1 + tests/authority-consistency.mjs | 35 ++ tests/authority-diagnostics-bundle.mjs | 28 ++ tests/authority-jobs.mjs | 128 +++++ tests/perf/authority-recall-bench.mjs | 329 +++++++++++++ ui/panel.js | 84 ++++ ui/ui-status.js | 12 + 12 files changed, 1403 insertions(+), 13 deletions(-) create mode 100644 tests/perf/authority-recall-bench.mjs diff --git a/index.js b/index.js index 809c394..5c79ffd 100644 --- a/index.js +++ b/index.js @@ -379,6 +379,7 @@ import { import { trackAuthorityJobUntilTerminal } from "./maintenance/authority-job-tracker.js"; import { applyAuthorityCheckpointToStore, + buildAuthorityConsistencyRepairPlan, buildAuthorityConsistencyAudit, } from "./maintenance/authority-consistency.js"; import { @@ -386,6 +387,7 @@ import { normalizeAuthorityBlobConfig, } from "./maintenance/authority-blob-adapter.js"; import { + AUTHORITY_DIAGNOSTICS_MANIFEST_LIMIT, buildAuthorityDiagnosticsBundle, buildAuthorityDiagnosticsBundlePath, buildAuthorityDiagnosticsManifestPath, @@ -1826,6 +1828,15 @@ function getGraphPersistenceLiveState() { authorityLastJobUpdatedAt: String( graphPersistenceState.authorityLastJobUpdatedAt || "", ), + authorityJobTrackingMode: String( + graphPersistenceState.authorityJobTrackingMode || "idle", + ), + authorityJobTrackingReason: String( + graphPersistenceState.authorityJobTrackingReason || "", + ), + authorityJobTrackingUpdatedAt: String( + graphPersistenceState.authorityJobTrackingUpdatedAt || "", + ), authorityRecentJobs: cloneRuntimeDebugValue( graphPersistenceState.authorityRecentJobs, [], @@ -1891,6 +1902,15 @@ function getGraphPersistenceLiveState() { authorityCheckpointRestoreError: String( graphPersistenceState.authorityCheckpointRestoreError || "", ), + authorityRepairState: String(graphPersistenceState.authorityRepairState || "idle"), + authorityRepairResult: cloneRuntimeDebugValue( + graphPersistenceState.authorityRepairResult, + null, + ), + authorityRepairUpdatedAt: String( + graphPersistenceState.authorityRepairUpdatedAt || "", + ), + authorityRepairError: String(graphPersistenceState.authorityRepairError || ""), authorityPerformanceBaseline: cloneRuntimeDebugValue( graphPersistenceState.authorityPerformanceBaseline, null, @@ -2041,6 +2061,19 @@ function getGraphPersistenceLiveState() { authorityDiagnosticsArtifactsError: String( graphPersistenceState.authorityDiagnosticsArtifactsError || "", ), + authorityDiagnosticsRetentionLimit: Number( + graphPersistenceState.authorityDiagnosticsRetentionLimit || + AUTHORITY_DIAGNOSTICS_MANIFEST_LIMIT, + ), + authorityDiagnosticsLastPrunedCount: Number( + graphPersistenceState.authorityDiagnosticsLastPrunedCount || 0, + ), + authorityDiagnosticsLastPrunedAt: String( + graphPersistenceState.authorityDiagnosticsLastPrunedAt || "", + ), + authorityDiagnosticsLastPruneError: String( + graphPersistenceState.authorityDiagnosticsLastPruneError || "", + ), }; return cloneRuntimeDebugValue(snapshot, snapshot); @@ -2114,6 +2147,14 @@ function mergeAuthorityRecentJobsIntoState(incomingJobs = [], options = {}) { return nextRecentJobs; } +function setAuthorityJobTrackingState(mode = "idle", reason = "") { + updateGraphPersistenceState({ + authorityJobTrackingMode: String(mode || "idle"), + authorityJobTrackingReason: String(reason || ""), + authorityJobTrackingUpdatedAt: new Date().toISOString(), + }); +} + async function refreshAuthorityRecentJobs(options = {}) { const settings = getSettings(); const { capability } = getAuthorityRuntimeSnapshot(settings); @@ -2317,6 +2358,49 @@ function getAuthorityBlobAdapter(options = {}) { }); } +async function enforceAuthorityDiagnosticsRetention(adapter, prunedEntries = [], options = {}) { + const normalizedEntries = Array.isArray(prunedEntries) + ? prunedEntries.filter((entry) => entry && typeof entry === "object") + : []; + const results = []; + const errors = []; + for (const entry of normalizedEntries) { + const artifactPath = String(entry?.path || "").trim(); + if (!artifactPath) { + continue; + } + try { + const deleteResult = await adapter.delete(artifactPath, { + signal: options.signal, + }); + const ok = deleteResult?.ok !== false; + results.push({ + path: artifactPath, + ok, + deleted: deleteResult?.deleted === true, + missing: deleteResult?.missing === true, + }); + if (!ok) { + errors.push(`${artifactPath}: ${deleteResult?.error || deleteResult?.reason || "delete failed"}`); + } + } catch (error) { + const message = error?.message || String(error) || "delete failed"; + results.push({ + path: artifactPath, + ok: false, + error: message, + }); + errors.push(`${artifactPath}: ${message}`); + } + } + return { + ok: errors.length === 0, + count: results.filter((item) => item.ok !== false).length, + results, + error: errors.join(" | "), + }; +} + function buildAuthorityPerformanceBaselineSnapshot(options = {}) { const liveGraphPersistence = getGraphPersistenceLiveState(); return buildAuthorityPerformanceBaseline({ @@ -2444,6 +2528,20 @@ async function exportAuthorityDiagnosticsBundle(options = {}) { path: manifestPath, signal: options.signal, }).catch(() => null); + const retentionResult = manifestResult + ? await enforceAuthorityDiagnosticsRetention( + adapter, + manifestResult?.prunedEntries, + { + signal: options.signal, + }, + ) + : { + ok: true, + count: 0, + results: [], + error: "", + }; const nextArtifactEntries = manifestResult?.entries || [ manifestEntry, ...((Array.isArray(graphPersistenceState.authorityDiagnosticsArtifacts) @@ -2473,6 +2571,11 @@ async function exportAuthorityDiagnosticsBundle(options = {}) { manifestResult?.manifest?.updatedAt || updatedAt, ), authorityDiagnosticsArtifactsError: "", + authorityDiagnosticsRetentionLimit: AUTHORITY_DIAGNOSTICS_MANIFEST_LIMIT, + authorityDiagnosticsLastPrunedCount: Number(retentionResult?.count || 0), + authorityDiagnosticsLastPrunedAt: + Number(retentionResult?.count || 0) > 0 ? updatedAt : String(graphPersistenceState.authorityDiagnosticsLastPrunedAt || ""), + authorityDiagnosticsLastPruneError: String(retentionResult?.error || ""), }); if (options.refreshHost !== false) { refreshPanelLiveState(); @@ -2483,6 +2586,7 @@ async function exportAuthorityDiagnosticsBundle(options = {}) { size: bundleSize, baseline, bundle, + retention: retentionResult, }; } catch (error) { const message = @@ -2534,6 +2638,7 @@ async function refreshAuthorityDiagnosticsArtifacts(options = {}) { result?.manifest?.updatedAt || new Date().toISOString(), ), authorityDiagnosticsArtifactsError: "", + authorityDiagnosticsRetentionLimit: AUTHORITY_DIAGNOSTICS_MANIFEST_LIMIT, }); if (options.refreshHost !== false) { refreshPanelLiveState(); @@ -2549,6 +2654,7 @@ async function refreshAuthorityDiagnosticsArtifacts(options = {}) { authorityDiagnosticsManifestPath: manifestPath, authorityDiagnosticsArtifactsError: message, authorityDiagnosticsArtifactsUpdatedAt: new Date().toISOString(), + authorityDiagnosticsRetentionLimit: AUTHORITY_DIAGNOSTICS_MANIFEST_LIMIT, }); if (options.refreshHost !== false) { refreshPanelLiveState(); @@ -2632,6 +2738,7 @@ async function deleteAuthorityDiagnosticsArtifact(path = "", options = {}) { manifestResult?.manifest?.updatedAt || updatedAt, ), authorityDiagnosticsArtifactsError: "", + authorityDiagnosticsRetentionLimit: AUTHORITY_DIAGNOSTICS_MANIFEST_LIMIT, authorityDiagnosticsBundlePath: wasLatestArtifact ? "" : graphPersistenceState.authorityDiagnosticsBundlePath, authorityDiagnosticsBundleReason: wasLatestArtifact ? "" : graphPersistenceState.authorityDiagnosticsBundleReason, authorityDiagnosticsBundleUpdatedAt: wasLatestArtifact ? "" : graphPersistenceState.authorityDiagnosticsBundleUpdatedAt, @@ -3164,6 +3271,354 @@ async function writeAuthorityCheckpointFromCurrentGraph(options = {}) { }; } +async function rebuildAuthorityTrivium(options = {}) { + const vectorConfig = options.config || getEmbeddingConfig(); + const validation = validateVectorConfig(vectorConfig); + if (!validation.valid) { + return { + success: false, + error: validation.error || "Authority Trivium 配置无效", + }; + } + + const range = options.range || null; + const reason = String(options.reason || "authority-trivium-rebuild"); + if (!range && options.useJobs !== false && shouldUseAuthorityJobs(vectorConfig)) { + const jobResult = await submitAuthorityVectorRebuildJob({ + config: vectorConfig, + range, + purge: options.purge !== false, + signal: options.signal, + }); + if (jobResult?.submitted) { + saveGraphToChat({ reason: `${reason}-job-submitted` }); + return { + success: true, + submitted: true, + terminal: false, + mode: "job", + job: jobResult.job, + }; + } + if (jobResult?.error) { + const fallbackResult = await syncVectorState({ + force: true, + purge: isBackendVectorConfig(vectorConfig) || isAuthorityVectorConfig(vectorConfig), + range, + signal: options.signal, + }); + if (fallbackResult?.aborted) { + return { + success: false, + aborted: true, + error: "aborted", + }; + } + if (fallbackResult?.error) { + return { + success: false, + error: fallbackResult.error, + }; + } + saveGraphToChat({ reason: `${reason}-complete` }); + return { + success: true, + submitted: false, + terminal: true, + mode: "local-fallback", + fallbackError: jobResult.error, + result: fallbackResult, + stats: fallbackResult?.stats || getVectorIndexStats(currentGraph), + }; + } + } + + const result = await syncVectorState({ + force: true, + purge: !range && (isBackendVectorConfig(vectorConfig) || isAuthorityVectorConfig(vectorConfig)), + range, + signal: options.signal, + }); + if (result?.aborted) { + return { + success: false, + aborted: true, + error: "aborted", + }; + } + if (result?.error) { + return { + success: false, + error: result.error, + }; + } + saveGraphToChat({ reason: `${reason}-complete` }); + return { + success: true, + submitted: false, + terminal: true, + mode: "local", + result, + stats: result?.stats || getVectorIndexStats(currentGraph), + }; +} + +async function runAuthorityConsistencyRepairPlan(options = {}) { + const updatedAt = new Date().toISOString(); + const chatId = normalizeChatIdCandidate( + options.chatId || getCurrentChatId() || graphPersistenceState.chatId, + ); + if (!chatId) { + return { + success: false, + error: "missing-chat-id", + }; + } + + let audit = + options.audit && typeof options.audit === "object" && !Array.isArray(options.audit) + ? options.audit + : graphPersistenceState.authorityConsistencyAudit && + typeof graphPersistenceState.authorityConsistencyAudit === "object" && + !Array.isArray(graphPersistenceState.authorityConsistencyAudit) + ? graphPersistenceState.authorityConsistencyAudit + : null; + if (!audit) { + const auditResult = await runAuthorityConsistencyAudit({ + chatId, + collectionId: options.collectionId, + }); + if (!auditResult?.success || !auditResult?.audit) { + updateGraphPersistenceState({ + authorityRepairState: "error", + authorityRepairUpdatedAt: updatedAt, + authorityRepairError: auditResult?.error || "Authority 审计失败,无法继续修复", + }); + refreshPanelLiveState(); + return { + success: false, + error: auditResult?.error || "Authority 审计失败,无法继续修复", + }; + } + audit = auditResult.audit; + } + + const plan = buildAuthorityConsistencyRepairPlan(audit); + if (plan.blockedIssueCodes.length > 0 && options.force !== true) { + const message = `存在阻塞问题:${plan.blockedIssueCodes.join(", ")}`; + updateGraphPersistenceState({ + authorityRepairState: "error", + authorityRepairUpdatedAt: updatedAt, + authorityRepairError: message, + authorityRepairResult: cloneRuntimeDebugValue( + { + plan, + steps: [], + auditSummary: audit.summary || null, + }, + null, + ), + }); + refreshPanelLiveState(); + return { + success: false, + error: message, + plan, + audit, + }; + } + + if (!plan.ok) { + const result = { + plan, + steps: [], + auditSummary: audit.summary || null, + handoffRequired: false, + finalAuditSummary: audit.summary || null, + finalAuditDrift: audit.drift || null, + }; + updateGraphPersistenceState({ + authorityRepairState: "success", + authorityRepairUpdatedAt: updatedAt, + authorityRepairError: "", + authorityRepairResult: cloneRuntimeDebugValue(result, null), + }); + refreshPanelLiveState(); + return { + success: true, + plan, + results: [], + audit, + handoffRequired: false, + repairResult: result, + }; + } + + updateGraphPersistenceState({ + authorityRepairState: "running", + authorityRepairUpdatedAt: updatedAt, + authorityRepairError: "", + authorityRepairResult: cloneRuntimeDebugValue( + { + plan, + steps: [], + auditSummary: audit.summary || null, + }, + null, + ), + }); + refreshPanelLiveState(); + + try { + const stepResults = []; + let handoffRequired = false; + for (const step of plan.steps) { + let stepOutcome = null; + if (step.action === "write-authority-checkpoint") { + stepOutcome = await writeAuthorityCheckpointFromCurrentGraph({ + chatId, + collectionId: options.collectionId, + reason: "authority-repair-write-checkpoint", + signal: options.signal, + }); + stepResults.push({ + action: step.action, + label: step.label, + detail: step.detail, + success: stepOutcome?.success === true, + submitted: false, + terminal: true, + result: stepOutcome?.result || null, + error: stepOutcome?.error || "", + }); + } else if (step.action === "restore-from-authority-blob-checkpoint") { + stepOutcome = await restoreAuthorityCheckpointFromBlob({ + chatId, + reason: "authority-repair-restore-checkpoint", + signal: options.signal, + }); + stepResults.push({ + action: step.action, + label: step.label, + detail: step.detail, + success: stepOutcome?.success === true, + submitted: false, + terminal: true, + result: stepOutcome?.result || null, + error: stepOutcome?.error || "", + }); + } else if (step.action === "rebuild-authority-trivium") { + stepOutcome = await rebuildAuthorityTrivium({ + chatId, + reason: "authority-repair-trivium-rebuild", + signal: options.signal, + }); + handoffRequired = stepOutcome?.submitted === true && stepOutcome?.terminal === false; + stepResults.push({ + action: step.action, + label: step.label, + detail: step.detail, + success: stepOutcome?.success === true, + submitted: stepOutcome?.submitted === true, + terminal: stepOutcome?.terminal !== false, + mode: stepOutcome?.mode || "", + job: cloneRuntimeDebugValue(stepOutcome?.job, null), + result: stepOutcome?.result || null, + stats: cloneRuntimeDebugValue(stepOutcome?.stats, null), + fallbackError: stepOutcome?.fallbackError || "", + error: stepOutcome?.error || "", + }); + } else { + stepResults.push({ + action: step.action, + label: step.label, + detail: step.detail, + success: false, + submitted: false, + terminal: true, + error: `unsupported action: ${step.action}`, + }); + } + + const latestStep = stepResults[stepResults.length - 1]; + if (!latestStep?.success) { + const failedResult = { + plan, + steps: stepResults, + auditSummary: audit.summary || null, + handoffRequired: false, + finalAuditSummary: null, + finalAuditDrift: null, + }; + updateGraphPersistenceState({ + authorityRepairState: "error", + authorityRepairUpdatedAt: new Date().toISOString(), + authorityRepairError: latestStep?.error || `${step.label} 失败`, + authorityRepairResult: cloneRuntimeDebugValue(failedResult, null), + }); + refreshPanelLiveState(); + return { + success: false, + error: latestStep?.error || `${step.label} 失败`, + plan, + results: stepResults, + audit, + handoffRequired: false, + repairResult: failedResult, + }; + } + if (handoffRequired) { + break; + } + } + + const finalAuditResult = handoffRequired + ? null + : await runAuthorityConsistencyAudit({ + chatId, + collectionId: options.collectionId, + }).catch(() => null); + const finishedAt = new Date().toISOString(); + const repairResult = { + plan, + steps: stepResults, + auditSummary: audit.summary || null, + handoffRequired, + finalAuditSummary: finalAuditResult?.audit?.summary || null, + finalAuditDrift: finalAuditResult?.audit?.drift || null, + }; + updateGraphPersistenceState({ + authorityRepairState: handoffRequired ? "running" : "success", + authorityRepairUpdatedAt: finishedAt, + authorityRepairError: "", + authorityRepairResult: cloneRuntimeDebugValue(repairResult, null), + }); + refreshPanelLiveState(); + return { + success: true, + plan, + results: stepResults, + audit: finalAuditResult?.audit || audit, + handoffRequired, + repairResult, + }; + } catch (error) { + const message = error?.message || String(error) || "Authority repair orchestration failed"; + updateGraphPersistenceState({ + authorityRepairState: "error", + authorityRepairUpdatedAt: new Date().toISOString(), + authorityRepairError: message, + }); + refreshPanelLiveState(); + return { + success: false, + error: message, + plan, + audit, + }; + } +} + async function submitAuthorityVectorRebuildJob({ config = null, range = null, @@ -3273,6 +3728,7 @@ function stopTrackingAuthorityJob(reason = "authority-job-tracking-stopped") { authorityJobPollJobId = ""; authorityJobPollChatId = ""; authorityJobPollPromise = null; + setAuthorityJobTrackingState("idle", reason); } function buildAuthorityJobStatusMeta(job = null, fallbackKind = "") { @@ -3350,6 +3806,10 @@ async function startTrackingAuthorityJob(job = null, options = {}) { authorityJobPollChatId = trackedChatId; const effectiveKind = String(options.kind || normalizedJob.kind || "").trim(); const jobConfig = normalizeAuthorityJobConfig(getSettings()); + setAuthorityJobTrackingState( + jobConfig.preferStream !== false ? "stream" : "polling", + jobConfig.preferStream !== false ? "stream-first" : "polling-only", + ); const applyTrackedJobUpdate = async (nextJob, state = {}) => { const normalizedNextJob = @@ -3408,6 +3868,13 @@ async function startTrackingAuthorityJob(job = null, options = {}) { pollIntervalMs: jobConfig.pollIntervalMs, timeoutMs: jobConfig.waitTimeoutMs, signal: controller.signal, + streamJob: + jobConfig.preferStream !== false + ? async (targetJobId) => { + const adapter = getAuthorityJobAdapter(); + return await adapter.stream(targetJobId, { signal: controller.signal }); + } + : null, loadJob: async (targetJobId) => { const activeChatId = normalizeChatIdCandidate(getCurrentChatId()) || @@ -3420,9 +3887,23 @@ async function startTrackingAuthorityJob(job = null, options = {}) { return await adapter.get(targetJobId, { signal: controller.signal }); }, onUpdate: applyTrackedJobUpdate, + onModeChange: async ({ mode, reason }) => { + if (authorityJobPollAbortController !== controller) { + return; + } + setAuthorityJobTrackingState(mode, reason); + refreshPanelLiveState(); + }, }) .catch((error) => { if (isAbortError(error)) { + if (authorityJobPollAbortController === controller) { + const abortReason = String( + controller.signal?.reason?.message || controller.signal?.reason || "authority-job-tracking-stopped", + ); + setAuthorityJobTrackingState("idle", abortReason); + refreshPanelLiveState(); + } return null; } const message = error?.message || String(error) || "Authority Job 状态轮询失败"; @@ -3440,6 +3921,7 @@ async function startTrackingAuthorityJob(job = null, options = {}) { queueState: "error", }); syncAuthorityVectorJobState(failedJob); + setAuthorityJobTrackingState("error", message); setLastVectorStatus( "Authority Job 失败", buildAuthorityJobStatusMeta(failedJob, effectiveKind || normalizedJob.kind), @@ -21728,6 +22210,12 @@ async function onRunAuthorityConsistencyAudit() { }); } +async function onRunAuthorityConsistencyRepairPlan() { + return await runAuthorityConsistencyRepairPlan({ + reason: "panel-authority-consistency-repair-plan", + }); +} + async function onWriteAuthorityCheckpoint() { return await writeAuthorityCheckpointFromCurrentGraph({ reason: "panel-authority-checkpoint-write", @@ -22344,6 +22832,7 @@ async function onCompactLukerSidecar() { requeueAuthorityJob: async (jobId) => await requeueAuthorityJob(jobId), refreshAuthorityJobs: onRefreshAuthorityJobs, runAuthorityConsistencyAudit: onRunAuthorityConsistencyAudit, + runAuthorityConsistencyRepairPlan: onRunAuthorityConsistencyRepairPlan, writeAuthorityCheckpoint: onWriteAuthorityCheckpoint, restoreAuthorityCheckpoint: onRestoreAuthorityCheckpoint, captureAuthorityPerformanceBaseline: onCaptureAuthorityPerformanceBaseline, diff --git a/maintenance/authority-consistency.js b/maintenance/authority-consistency.js index 3f5af02..3ba02a3 100644 --- a/maintenance/authority-consistency.js +++ b/maintenance/authority-consistency.js @@ -77,6 +77,92 @@ function buildRevisionDelta(left = null, right = null) { return Number(left) - Number(right); } +function normalizeRepairAction(value = "") { + return String(value || "").trim(); +} + +function collectIssueCodes(audit = null) { + return new Set( + (Array.isArray(audit?.issues) ? audit.issues : []) + .map((issue) => String(issue?.code || "").trim()) + .filter(Boolean), + ); +} + +export function buildAuthorityConsistencyRepairPlan(audit = null) { + const source = audit && typeof audit === "object" && !Array.isArray(audit) ? audit : {}; + const actions = Array.isArray(source.actions) + ? source.actions.map(normalizeRepairAction).filter(Boolean) + : []; + const issueCodes = collectIssueCodes(source); + const steps = []; + const addStep = (action, label, detail, codes = []) => { + const normalizedAction = normalizeRepairAction(action); + if (!normalizedAction || !actions.includes(normalizedAction)) { + return; + } + if (Array.isArray(codes) && codes.length > 0) { + const matched = codes.some((code) => issueCodes.has(String(code || "").trim())); + if (!matched) { + return; + } + } + if (steps.some((step) => step.action === normalizedAction)) { + return; + } + steps.push({ + action: normalizedAction, + label: String(label || normalizedAction), + detail: String(detail || ""), + issueCodes: Array.isArray(codes) ? codes.map((code) => String(code || "").trim()).filter(Boolean) : [], + }); + }; + + addStep( + "write-authority-checkpoint", + "写入当前 Checkpoint", + "Authority Blob 尚无 checkpoint,先把当前 runtime 图谱写成 checkpoint,再继续后续修复。", + ["blob-checkpoint-missing"], + ); + addStep( + "restore-from-authority-blob-checkpoint", + "从 Blob Checkpoint 恢复 SQL", + "检测到 runtime / SQL / Blob revision 漂移,可用 Blob checkpoint 回灌 Authority SQL。", + ["sql-runtime-revision-drift", "blob-runtime-revision-drift"], + ); + addStep( + "rebuild-authority-trivium", + "重建 Authority Trivium", + "Trivium 与 SQL revision 不一致,或当前向量索引仍为 dirty,需要重建 Trivium。", + ["trivium-sql-revision-drift", "trivium-collection-mismatch", "vector-dirty"], + ); + + const blockedIssueCodes = (Array.isArray(source.issues) ? source.issues : []) + .filter((issue) => String(issue?.severity || "") === "error") + .map((issue) => String(issue?.code || "").trim()) + .filter(Boolean); + const unsupportedActions = actions.filter( + (action) => action !== "run-authority-consistency-audit" && !steps.some((step) => step.action === action), + ); + const detail = steps.length + ? `建议顺序:${steps.map((step) => step.label).join(" → ")}` + : String(source?.summary?.detail || "当前审计未发现需要自动编排的修复步骤"); + + return { + ok: steps.length > 0, + steps, + stepCount: steps.length, + requiresConfirmation: steps.some((step) => step.action === "restore-from-authority-blob-checkpoint"), + blockedIssueCodes, + unsupportedActions, + summary: { + level: steps.length > 0 ? "warning" : String(source?.summary?.level || "idle"), + label: steps.length > 0 ? `建议修复 ${steps.length} 步` : "当前无需编排修复", + detail, + }, + }; +} + export function buildAuthorityCheckpointImportSnapshot(checkpoint = null, options = {}) { const normalizedCheckpoint = checkpoint && typeof checkpoint === "object" && !Array.isArray(checkpoint) diff --git a/maintenance/authority-diagnostics-bundle.js b/maintenance/authority-diagnostics-bundle.js index 7fd6264..54c50b2 100644 --- a/maintenance/authority-diagnostics-bundle.js +++ b/maintenance/authority-diagnostics-bundle.js @@ -57,7 +57,7 @@ function buildCompactTimestamp(date = new Date()) { } const AUTHORITY_DIAGNOSTICS_MANIFEST_VERSION = 1; -const AUTHORITY_DIAGNOSTICS_MANIFEST_LIMIT = 12; +export const AUTHORITY_DIAGNOSTICS_MANIFEST_LIMIT = 12; function isSensitiveKey(key = "") { return /(api[_-]?key|token|secret|password|authorization|auth[_-]?header|cookie)/i.test( @@ -458,6 +458,19 @@ function buildAuthorityDiagnosticsManifestEntries(entries = [], limit = AUTHORIT return normalizedEntries.slice(0, normalizeManifestLimit(limit)); } +function buildAuthorityDiagnosticsPrunedEntries(previousEntries = [], nextEntries = []) { + const normalizedPreviousEntries = buildAuthorityDiagnosticsManifestEntries( + previousEntries, + Number.MAX_SAFE_INTEGER, + ); + const keptPaths = new Set( + buildAuthorityDiagnosticsManifestEntries(nextEntries, Number.MAX_SAFE_INTEGER).map( + (entry) => entry.path, + ), + ); + return normalizedPreviousEntries.filter((entry) => !keptPaths.has(entry.path)); +} + function normalizeAuthorityDiagnosticsManifest(manifest = null, options = {}) { const source = manifest && typeof manifest === "object" && !Array.isArray(manifest) ? manifest : {}; const chatId = normalizeRecordId(options.chatId || source.chatId); @@ -560,10 +573,12 @@ export async function upsertAuthorityDiagnosticsManifestEntry(adapter, entry = n signal: options.signal, limit: options.limit, }); - const entries = buildAuthorityDiagnosticsManifestEntries( - [normalizedEntry, ...current.entries.filter((item) => item.path !== normalizedEntry.path)], - options.limit, - ); + const candidateEntries = [ + normalizedEntry, + ...current.entries.filter((item) => item.path !== normalizedEntry.path), + ]; + const entries = buildAuthorityDiagnosticsManifestEntries(candidateEntries, options.limit); + const prunedEntries = buildAuthorityDiagnosticsPrunedEntries(candidateEntries, entries); const writeResult = await writeAuthorityDiagnosticsManifest( adapter, { @@ -583,6 +598,7 @@ export async function upsertAuthorityDiagnosticsManifestEntry(adapter, entry = n return { ...writeResult, entry: normalizedEntry, + prunedEntries, }; } diff --git a/maintenance/authority-job-adapter.js b/maintenance/authority-job-adapter.js index 6c8e593..76a572e 100644 --- a/maintenance/authority-job-adapter.js +++ b/maintenance/authority-job-adapter.js @@ -63,6 +63,56 @@ function normalizeJobStatus(value = "queued") { return String(value || "queued").trim().toLowerCase() || "queued"; } +function hasAsyncIterator(value = null) { + return !!value && typeof value[Symbol.asyncIterator] === "function"; +} + +function hasIterator(value = null) { + return !!value && typeof value[Symbol.iterator] === "function"; +} + +function readJobStreamPayload(event = null) { + const source = event && typeof event === "object" && !Array.isArray(event) ? event : null; + if (!source) return event; + if (source.job && typeof source.job === "object" && !Array.isArray(source.job)) { + return source.job; + } + if (source.result && typeof source.result === "object" && !Array.isArray(source.result)) { + return source.result; + } + if (source.payload && typeof source.payload === "object" && !Array.isArray(source.payload)) { + return source.payload; + } + if (source.data && typeof source.data === "object" && !Array.isArray(source.data)) { + return source.data; + } + return source; +} + +async function* normalizeAuthorityJobStream(source = null, options = {}) { + if (hasAsyncIterator(source)) { + for await (const event of source) { + throwIfAborted(options.signal); + const job = normalizeAuthorityJobRecord(readJobStreamPayload(event)); + if (job.id || job.status || job.error) { + yield job; + } + } + return; + } + if (hasIterator(source)) { + for (const event of source) { + throwIfAborted(options.signal); + const job = normalizeAuthorityJobRecord(readJobStreamPayload(event)); + if (job.id || job.status || job.error) { + yield job; + } + } + return; + } + throw new Error("Authority Jobs stream unavailable"); +} + function readJobRows(payload = null) { if (Array.isArray(payload)) return payload; if (!payload || typeof payload !== "object") return []; @@ -192,6 +242,7 @@ export function normalizeAuthorityJobConfig(settings = {}, overrides = {}) { baseUrl: normalizeAuthorityBaseUrl(source.authorityBaseUrl ?? source.baseUrl), enabled: source.authorityJobsEnabled !== false && source.jobsEnabled !== false, failOpen: source.authorityFailOpen !== false && source.failOpen !== false, + preferStream: source.authorityJobPreferStream !== false && source.jobStreamPreferred !== false, pollIntervalMs: normalizeInteger(source.authorityJobPollIntervalMs ?? source.pollIntervalMs, 1200, 250, 30000), waitTimeoutMs: normalizeInteger(source.authorityJobWaitTimeoutMs ?? source.waitTimeoutMs, 0, 0, 3600000), ...overrides, @@ -271,6 +322,15 @@ async function callClient(client, methodNames = [], action = "request", payload throw new Error(`Authority Jobs ${action} unavailable`); } +async function callStreamClient(client, methodNames = [], payload = {}) { + for (const methodName of methodNames) { + if (typeof client?.[methodName] === "function") { + return await client[methodName](payload); + } + } + throw new Error("Authority Jobs stream unavailable"); +} + function throwIfAborted(signal) { if (signal?.aborted) { throw signal.reason instanceof Error @@ -331,6 +391,24 @@ export class AuthorityJobAdapter { return normalizeAuthorityJobRecord(result?.job || result?.result || result); } + async stream(jobId, options = {}) { + throwIfAborted(options.signal); + const id = normalizeRecordId(jobId); + if (!id) { + return normalizeAuthorityJobStream([], options); + } + const source = await callStreamClient( + this.client, + ["stream", "streamJob", "watch", "watchJob"], + { + jobId: id, + id, + signal: options.signal, + }, + ); + return normalizeAuthorityJobStream(source, options); + } + async waitForCompletion(jobId, options = {}) { throwIfAborted(options.signal); const id = normalizeRecordId(jobId); diff --git a/maintenance/authority-job-tracker.js b/maintenance/authority-job-tracker.js index 4e656e9..748d052 100644 --- a/maintenance/authority-job-tracker.js +++ b/maintenance/authority-job-tracker.js @@ -29,15 +29,52 @@ function sleep(ms, signal) { }); } +function hasAsyncIterator(value = null) { + return !!value && typeof value[Symbol.asyncIterator] === "function"; +} + +function readStreamJobUpdate(event = null) { + const source = event && typeof event === "object" && !Array.isArray(event) ? event : null; + if (!source) { + return event; + } + if (source.job && typeof source.job === "object" && !Array.isArray(source.job)) { + return source.job; + } + if (source.result && typeof source.result === "object" && !Array.isArray(source.result)) { + return source.result; + } + if (source.payload && typeof source.payload === "object" && !Array.isArray(source.payload)) { + return source.payload; + } + if (source.data && typeof source.data === "object" && !Array.isArray(source.data)) { + return source.data; + } + return source; +} + +function buildTimeoutJob(job = null) { + const latest = job && typeof job === "object" && !Array.isArray(job) ? job : {}; + return { + ...latest, + status: "timeout", + terminal: true, + success: false, + error: String(latest?.error || "wait timeout"), + }; +} + export async function trackAuthorityJobUntilTerminal({ initialJob = null, loadJob, + streamJob = null, onUpdate = null, + onModeChange = null, pollIntervalMs = 1200, timeoutMs = 0, signal = undefined, } = {}) { - if (typeof loadJob !== "function") { + if (typeof loadJob !== "function" && typeof streamJob !== "function") { throw new Error("Authority job loader unavailable"); } const initial = @@ -51,30 +88,96 @@ export async function trackAuthorityJobUntilTerminal({ const startedAt = Date.now(); let latest = { ...initial }; + const emitModeChange = async (mode, reason = "") => { + if (typeof onModeChange === "function") { + await onModeChange({ + mode: String(mode || "idle"), + reason: String(reason || ""), + elapsedMs: Date.now() - startedAt, + }); + } + }; if (typeof onUpdate === "function") { await onUpdate(latest, { phase: "initial", elapsedMs: 0, + transport: typeof streamJob === "function" ? "stream" : "polling", }); } if (latest.terminal) { return latest; } + if (typeof streamJob === "function") { + let streamFailureReason = "stream-ended"; + await emitModeChange("stream", "stream-first"); + try { + const stream = await streamJob(jobId, { + signal, + previousJob: latest, + elapsedMs: 0, + }); + if (!hasAsyncIterator(stream)) { + throw new Error("Authority Jobs stream unavailable"); + } + for await (const event of stream) { + throwIfAborted(signal); + if (timeoutMs > 0 && Date.now() - startedAt >= timeoutMs) { + latest = buildTimeoutJob(latest); + if (typeof onUpdate === "function") { + await onUpdate(latest, { + phase: "timeout", + elapsedMs: Date.now() - startedAt, + transport: "stream", + }); + } + return latest; + } + const nextJob = readStreamJobUpdate(event); + if (!nextJob || typeof nextJob !== "object" || Array.isArray(nextJob)) { + continue; + } + latest = { + ...latest, + ...nextJob, + }; + if (typeof onUpdate === "function") { + await onUpdate(latest, { + phase: latest?.terminal ? "terminal" : "stream", + elapsedMs: Date.now() - startedAt, + transport: "stream", + }); + } + if (latest?.terminal) { + return latest; + } + } + } catch (error) { + if (error?.name === "AbortError") { + throw error; + } + if (typeof loadJob !== "function") { + throw error; + } + streamFailureReason = error?.message || String(error) || "stream-fallback"; + } + if (typeof loadJob !== "function") { + throw new Error("Authority job stream ended before terminal state"); + } + await emitModeChange("polling", streamFailureReason); + } else { + await emitModeChange("polling", "polling-only"); + } + while (true) { throwIfAborted(signal); if (timeoutMs > 0 && Date.now() - startedAt >= timeoutMs) { - latest = { - ...latest, - status: "timeout", - terminal: true, - success: false, - error: String(latest?.error || "wait timeout"), - }; + latest = buildTimeoutJob(latest); if (typeof onUpdate === "function") { await onUpdate(latest, { phase: "timeout", elapsedMs: Date.now() - startedAt, + transport: "polling", }); } return latest; @@ -90,6 +193,7 @@ export async function trackAuthorityJobUntilTerminal({ await onUpdate(latest, { phase: latest?.terminal ? "terminal" : "poll", elapsedMs: Date.now() - startedAt, + transport: "polling", }); } if (latest?.terminal) { diff --git a/package.json b/package.json index 76d945d..6401d88 100644 --- a/package.json +++ b/package.json @@ -17,6 +17,7 @@ "bench:persist-load": "node tests/perf/persist-load-bench.mjs", "bench:persist-load:native-hydrate": "node tests/perf/persist-load-bench.mjs --native-hydrate", "bench:load-preapply": "node tests/perf/load-preapply-bench.mjs", + "bench:authority-recall": "node tests/perf/authority-recall-bench.mjs", "bench:p1-compare": "node scripts/compare-p1-bench.mjs", "bench:native": "npm run bench:graph-layout && npm run bench:persist-delta", "test:indexeddb": "npm run test:indexeddb-persistence && npm run test:indexeddb-sync && npm run test:indexeddb-migration", diff --git a/tests/authority-consistency.mjs b/tests/authority-consistency.mjs index 982d1e9..e8f9de4 100644 --- a/tests/authority-consistency.mjs +++ b/tests/authority-consistency.mjs @@ -5,6 +5,7 @@ import { applyAuthorityCheckpointToStore, buildAuthorityCheckpointImportSnapshot, buildAuthorityConsistencyAudit, + buildAuthorityConsistencyRepairPlan, } from "../maintenance/authority-consistency.js"; const graph = createEmptyGraph(); @@ -108,6 +109,9 @@ assert.equal(auditAligned.summary.level, "success"); assert.equal(auditAligned.issues.length, 0); assert.equal(auditAligned.drift.checkpointRestorable, true); assert.ok(auditAligned.actions.includes("restore-from-authority-blob-checkpoint")); +const alignedRepairPlan = buildAuthorityConsistencyRepairPlan(auditAligned); +assert.equal(alignedRepairPlan.ok, false); +assert.equal(alignedRepairPlan.stepCount, 0); const auditDrift = buildAuthorityConsistencyAudit({ chatId: "chat-a", @@ -146,5 +150,36 @@ assert.ok(auditDrift.issues.some((issue) => issue.code === "sql-runtime-revision assert.ok(auditDrift.issues.some((issue) => issue.code === "vector-dirty")); assert.ok(auditDrift.actions.includes("rebuild-authority-trivium")); assert.ok(auditDrift.actions.includes("write-authority-checkpoint")); +const driftRepairPlan = buildAuthorityConsistencyRepairPlan(auditDrift); +assert.equal(driftRepairPlan.ok, true); +assert.equal(driftRepairPlan.requiresConfirmation, false); +assert.deepEqual( + driftRepairPlan.steps.map((step) => step.action), + [ + "write-authority-checkpoint", + "rebuild-authority-trivium", + ], +); + +const restoreRepairPlan = buildAuthorityConsistencyRepairPlan({ + issues: [ + { + severity: "warning", + code: "sql-runtime-revision-drift", + message: "SQL revision drift", + }, + ], + actions: ["restore-from-authority-blob-checkpoint"], + summary: { + level: "warning", + detail: "runtime / SQL drift", + }, +}); +assert.equal(restoreRepairPlan.ok, true); +assert.equal(restoreRepairPlan.requiresConfirmation, true); +assert.deepEqual( + restoreRepairPlan.steps.map((step) => step.action), + ["restore-from-authority-blob-checkpoint"], +); console.log("authority-consistency tests passed"); diff --git a/tests/authority-diagnostics-bundle.mjs b/tests/authority-diagnostics-bundle.mjs index 36abc88..9d5b705 100644 --- a/tests/authority-diagnostics-bundle.mjs +++ b/tests/authority-diagnostics-bundle.mjs @@ -369,4 +369,32 @@ function createMockAdapter() { assert.equal(removeResult.entries[0].path, "user/files/diag-b.json"); } +{ + const adapter = createMockAdapter(); + await upsertAuthorityDiagnosticsManifestEntry(adapter, { + chatId: "chat-main", + path: "user/files/diag-1.json", + reason: "first", + size: 80, + updatedAt: "2026-01-01T00:00:00.000Z", + }, { limit: 2 }); + await upsertAuthorityDiagnosticsManifestEntry(adapter, { + chatId: "chat-main", + path: "user/files/diag-2.json", + reason: "second", + size: 90, + updatedAt: "2026-01-02T00:00:00.000Z", + }, { limit: 2 }); + const pruneResult = await upsertAuthorityDiagnosticsManifestEntry(adapter, { + chatId: "chat-main", + path: "user/files/diag-3.json", + reason: "third", + size: 100, + updatedAt: "2026-01-03T00:00:00.000Z", + }, { limit: 2 }); + assert.equal(pruneResult.entries.length, 2); + assert.equal(pruneResult.prunedEntries.length, 1); + assert.equal(pruneResult.prunedEntries[0].path, "user/files/diag-1.json"); +} + console.log("authority-diagnostics-bundle tests passed"); diff --git a/tests/authority-jobs.mjs b/tests/authority-jobs.mjs index 6581cd8..04d172b 100644 --- a/tests/authority-jobs.mjs +++ b/tests/authority-jobs.mjs @@ -197,6 +197,95 @@ assert.deepEqual(trackerPhases, [ ["terminal", "completed", 1], ]); +const streamedPhases = []; +const streamedModes = []; +const streamedJob = await trackAuthorityJobUntilTerminal({ + initialJob: { + id: "job-stream", + kind: "authority.vector.rebuild", + status: "queued", + progress: 0, + terminal: false, + success: false, + }, + async streamJob(jobId) { + return (async function* () { + yield { + job: { + id: jobId, + kind: "authority.vector.rebuild", + status: "running", + progress: 0.5, + terminal: false, + success: false, + }, + }; + yield { + job: { + id: jobId, + kind: "authority.vector.rebuild", + status: "completed", + progress: 1, + terminal: true, + success: true, + }, + }; + })(); + }, + async onModeChange(state) { + streamedModes.push([state.mode, state.reason]); + }, + async onUpdate(job, state) { + streamedPhases.push([state.phase, state.transport, job.status, Number(job.progress || 0)]); + }, +}); +assert.equal(streamedJob.status, "completed"); +assert.equal(streamedJob.success, true); +assert.deepEqual(streamedModes, [["stream", "stream-first"]]); +assert.deepEqual(streamedPhases, [ + ["initial", "stream", "queued", 0], + ["stream", "stream", "running", 0.5], + ["terminal", "stream", "completed", 1], +]); + +const fallbackModes = []; +let fallbackLoadCount = 0; +const fallbackTrackedJob = await trackAuthorityJobUntilTerminal({ + initialJob: { + id: "job-fallback", + kind: "authority.vector.rebuild", + status: "queued", + progress: 0, + terminal: false, + success: false, + }, + pollIntervalMs: 0, + timeoutMs: 1000, + async streamJob() { + throw new Error("stream offline"); + }, + async loadJob(jobId) { + fallbackLoadCount += 1; + return { + id: jobId, + kind: "authority.vector.rebuild", + status: "completed", + progress: 1, + terminal: true, + success: true, + }; + }, + async onModeChange(state) { + fallbackModes.push([state.mode, state.reason]); + }, +}); +assert.equal(fallbackTrackedJob.status, "completed"); +assert.equal(fallbackLoadCount, 1); +assert.deepEqual(fallbackModes, [ + ["stream", "stream-first"], + ["polling", "stream offline"], +]); + const timedOutJob = await trackAuthorityJobUntilTerminal({ initialJob: { id: "job-timeout", @@ -221,6 +310,45 @@ assert.equal(timedOutJob.status, "timeout"); assert.equal(timedOutJob.terminal, true); assert.equal(timedOutJob.success, false); +const streamingClient = { + async streamJob(payload) { + return (async function* () { + yield { + job: { + id: payload.jobId, + kind: "authority.vector.rebuild", + status: "running", + progress: 0.25, + terminal: false, + success: false, + }, + }; + yield { + job: { + id: payload.jobId, + kind: "authority.vector.rebuild", + status: "completed", + progress: 1, + terminal: true, + success: true, + }, + }; + })(); + }, +}; +const streamingAdapter = createAuthorityJobAdapter( + { authorityBaseUrl: "/api/plugins/authority" }, + { jobClient: streamingClient }, +); +const streamedUpdates = []; +for await (const update of await streamingAdapter.stream("job-stream-adapter")) { + streamedUpdates.push([update.status, Number(update.progress || 0)]); +} +assert.deepEqual(streamedUpdates, [ + ["running", 0.25], + ["completed", 1], +]); + function createVectorControllerRuntime(overrides = {}) { const calls = []; const signal = {}; diff --git a/tests/perf/authority-recall-bench.mjs b/tests/perf/authority-recall-bench.mjs new file mode 100644 index 0000000..9da700a --- /dev/null +++ b/tests/perf/authority-recall-bench.mjs @@ -0,0 +1,329 @@ +import { performance } from "node:perf_hooks"; + +import { + installResolveHooks, + toDataModuleUrl, +} from "../helpers/register-hooks-compat.mjs"; + +installResolveHooks([ + { + specifiers: ["../../../../../script.js"], + url: toDataModuleUrl("export function getRequestHeaders() { return {}; }"), + }, + { + specifiers: ["../../../../extensions.js"], + url: toDataModuleUrl("export const extension_settings = { st_bme: {} };") , + }, +]); + +const outputJson = process.argv.includes("--json"); +const RUNS = 5; +const SIZE_PRESETS = [ + { label: "M", totalNodes: 1200 }, + { label: "L", totalNodes: 3600 }, + { label: "XL", totalNodes: 7200 }, +]; + +const { normalizeAuthorityVectorConfig } = await import( + "../../vector/authority-vector-primary-adapter.js" +); +const { resolveAuthorityRecallCandidates } = await import( + "../../retrieval/authority-candidate-provider.js" +); + +function summarize(values = []) { + if (!values.length) { + return { avg: 0, p95: 0, min: 0, max: 0 }; + } + const sorted = [...values].sort((left, right) => left - right); + const sum = sorted.reduce((acc, value) => acc + value, 0); + const p95Index = Math.min(sorted.length - 1, Math.floor(sorted.length * 0.95)); + return { + avg: sum / sorted.length, + p95: sorted[p95Index], + min: sorted[0], + max: sorted[sorted.length - 1], + }; +} + +function formatSummary(label, summary = {}) { + return `${label} avg=${Number(summary.avg || 0).toFixed(2)}ms p95=${Number(summary.p95 || 0).toFixed(2)}ms min=${Number(summary.min || 0).toFixed(2)}ms max=${Number(summary.max || 0).toFixed(2)}ms`; +} + +function formatPercent(value = 0) { + return `${(Math.max(0, Number(value) || 0) * 100).toFixed(1)}%`; +} + +function buildNode(id, { + title = "", + summary = "", + regionKey = "archive", + storySegmentId = "seg-archive", + ownerId = "", + ownerName = "", + type = "event", + seq = 0, +} = {}) { + return { + id, + type, + archived: false, + seq, + importance: ownerId ? 8 : 4, + fields: { + title, + summary, + }, + scope: { + layer: ownerId ? "pov" : "objective", + ownerType: ownerId ? "character" : "", + ownerId, + ownerName, + bucket: ownerId ? "characterPov" : "objectiveGlobal", + regionKey, + }, + storySegmentId, + }; +} + +function createSyntheticRecallScenario({ label, totalNodes }) { + const chatId = `bench-authority-recall-${label.toLowerCase()}`; + const collectionId = `st-bme:${chatId}:nodes`; + const relevantNodes = [ + buildNode("node-alice-memory", { + title: "Alice remembers the silver key", + summary: "Alice knows the silver key is hidden behind the archive ledger.", + ownerId: "Alice", + ownerName: "Alice", + type: "pov_memory", + seq: 11, + }), + buildNode("node-archive-gate", { + title: "Archive gate opened", + summary: "The archive gate has just been unlocked.", + seq: 10, + }), + buildNode("node-vault", { + title: "Vault mechanism", + summary: "The hidden vault opens only after the archive gate is cleared.", + seq: 12, + }), + buildNode("node-ledger", { + title: "Ledger note", + summary: "The ledger mentions a silver key and a hidden switch.", + seq: 13, + }), + buildNode("node-guard", { + title: "Archive guard patrol", + summary: "A guard patrol circles the archive stairs every few minutes.", + seq: 14, + }), + buildNode("node-context", { + title: "Archive dust trail", + summary: "Fresh dust suggests someone visited the archive recently.", + seq: 15, + }), + ]; + const fillerNodes = []; + for (let index = 0; index < Math.max(0, totalNodes - relevantNodes.length); index += 1) { + const inArchive = index % 9 === 0; + fillerNodes.push( + buildNode(`node-filler-${index}`, { + title: `Filler ${index}`, + summary: `Background recall filler ${index}`, + regionKey: inArchive ? "archive" : index % 2 === 0 ? "market" : "harbor", + storySegmentId: inArchive ? "seg-archive" : index % 2 === 0 ? "seg-market" : "seg-harbor", + seq: 100 + index, + }), + ); + } + const nodes = [...relevantNodes, ...fillerNodes]; + const graph = { + version: 1, + nodes, + edges: [], + historyState: { + chatId, + }, + vectorIndexState: { + collectionId, + dirty: false, + }, + }; + return { + chatId, + collectionId, + graph, + availableNodes: nodes, + relevantIds: relevantNodes.map((node) => node.id), + filterIds: [ + "node-alice-memory", + "node-archive-gate", + "node-vault", + "node-ledger", + "node-context", + "node-filler-0", + "node-filler-9", + ], + searchResults: [ + { nodeId: "node-alice-memory", score: 0.99 }, + { nodeId: "node-vault", score: 0.93 }, + { nodeId: "node-ledger", score: 0.9 }, + { nodeId: "node-context", score: 0.84 }, + { nodeId: "node-outside", score: 0.8 }, + ], + neighbors: [ + { fromId: "node-alice-memory", toId: "node-vault" }, + { fromId: "node-vault", toId: "node-ledger" }, + { fromId: "node-ledger", toId: "node-context" }, + { fromId: "node-vault", toId: "node-archive-gate" }, + ], + }; +} + +function createBenchTriviumClient(scenario) { + return { + async filterWhere() { + return { + items: scenario.filterIds.map((nodeId) => ({ externalId: nodeId })), + }; + }, + async search() { + return { + results: scenario.searchResults, + }; + }, + async neighbors() { + return { + neighbors: scenario.neighbors, + }; + }, + }; +} + +function computeCoverage(candidateNodes = [], relevantIds = []) { + const candidateSet = new Set( + (Array.isArray(candidateNodes) ? candidateNodes : []) + .map((node) => String(node?.id || "").trim()) + .filter(Boolean), + ); + const normalizedRelevantIds = (Array.isArray(relevantIds) ? relevantIds : []) + .map((value) => String(value || "").trim()) + .filter(Boolean); + if (!normalizedRelevantIds.length) { + return 0; + } + const hits = normalizedRelevantIds.filter((value) => candidateSet.has(value)).length; + return hits / normalizedRelevantIds.length; +} + +async function runPreset(preset) { + const scenario = createSyntheticRecallScenario(preset); + const totalTimings = []; + const filterTimings = []; + const searchTimings = []; + const neighborTimings = []; + const coverages = []; + const reductionRatios = []; + const candidateCounts = []; + let usedCount = 0; + + for (let runIndex = 0; runIndex < RUNS; runIndex += 1) { + const triviumClient = createBenchTriviumClient(scenario); + const config = normalizeAuthorityVectorConfig( + { + authorityBaseUrl: "/api/plugins/authority", + authorityVectorFailOpen: true, + }, + { triviumClient }, + ); + const startedAt = performance.now(); + const result = await resolveAuthorityRecallCandidates({ + graph: scenario.graph, + userMessage: "Alice 现在在 archive 里找 silver key 和 hidden vault 吗?", + recentMessages: [ + "assistant: Alice just unlocked the archive gate.", + "assistant: The ledger may mention a hidden switch.", + ], + embeddingConfig: config, + availableNodes: scenario.availableNodes, + activeRegion: "archive", + activeStoryContext: { + activeSegmentId: "seg-archive", + }, + activeRecallOwnerKeys: ["character:Alice"], + sceneOwnerCandidates: [ + { + ownerKey: "character:Alice", + ownerName: "Alice", + }, + ], + options: { + enabled: true, + topK: 8, + maxRecallNodes: 6, + limit: 24, + neighborLimit: 6, + minimumUsedCandidateCount: 4, + enableMultiIntent: true, + }, + }); + totalTimings.push(performance.now() - startedAt); + filterTimings.push(Number(result?.diagnostics?.timings?.filter || 0)); + searchTimings.push(Number(result?.diagnostics?.timings?.search || 0)); + neighborTimings.push(Number(result?.diagnostics?.timings?.neighbors || 0)); + coverages.push(computeCoverage(result?.candidateNodes, scenario.relevantIds)); + reductionRatios.push( + scenario.availableNodes.length > 0 + ? Number((result?.candidateNodes?.length || 0) / scenario.availableNodes.length) + : 0, + ); + candidateCounts.push(Number(result?.candidateNodes?.length || 0)); + if (result?.used) { + usedCount += 1; + } + } + + return { + label: preset.label, + totalNodes: scenario.availableNodes.length, + relevantNodeCount: scenario.relevantIds.length, + candidateCount: summarize(candidateCounts), + timings: { + total: summarize(totalTimings), + filter: summarize(filterTimings), + search: summarize(searchTimings), + neighbors: summarize(neighborTimings), + }, + quality: { + coverage: summarize(coverages), + reductionRatio: summarize(reductionRatios), + usedRate: usedCount / RUNS, + }, + }; +} + +const results = []; +for (const preset of SIZE_PRESETS) { + results.push(await runPreset(preset)); +} + +if (outputJson) { + console.log(JSON.stringify({ + kind: "st-bme-authority-recall-bench", + runs: RUNS, + results, + }, null, 2)); +} else { + console.log(`Authority recall candidate bench (synthetic) · runs=${RUNS}`); + for (const result of results) { + console.log(`\n[${result.label}] nodes=${result.totalNodes} relevant=${result.relevantNodeCount}`); + console.log(formatSummary("total", result.timings.total)); + console.log(formatSummary("filter", result.timings.filter)); + console.log(formatSummary("search", result.timings.search)); + console.log(formatSummary("neighbors", result.timings.neighbors)); + console.log( + `coverage avg=${formatPercent(result.quality.coverage.avg)} reduction avg=${formatPercent(result.quality.reductionRatio.avg)} usedRate=${formatPercent(result.quality.usedRate)} candidate avg=${Number(result.candidateCount.avg || 0).toFixed(1)}`, + ); + } +} diff --git a/ui/panel.js b/ui/panel.js index 33a30d1..dea15a3 100644 --- a/ui/panel.js +++ b/ui/panel.js @@ -58,6 +58,7 @@ import { getSuggestedBackendModel, getVectorIndexStats, } from "../vector/vector-index.js"; +import { buildAuthorityConsistencyRepairPlan } from "../maintenance/authority-consistency.js"; let defaultPromptCache = null; @@ -3104,6 +3105,42 @@ function _refreshTaskPersistence() { const authorityRestoreUpdatedLabel = ps.authorityCheckpointRestoreUpdatedAt ? _formatTaskProfileTime(ps.authorityCheckpointRestoreUpdatedAt) : "—"; + const authorityRepairResult = + ps.authorityRepairResult && + typeof ps.authorityRepairResult === "object" && + !Array.isArray(ps.authorityRepairResult) + ? ps.authorityRepairResult + : null; + const authorityRepairPlan = buildAuthorityConsistencyRepairPlan(ps.authorityConsistencyAudit); + const authorityRepairState = String(ps.authorityRepairState || "idle").trim(); + const authorityRepairHandoffJobId = String( + authorityRepairResult?.steps?.find((step) => step?.submitted && step?.job?.id)?.job?.id || "", + ).trim(); + const authorityRepairLabel = + authorityRepairState === "success" + ? "修复完成" + : authorityRepairState === "error" + ? "修复失败" + : authorityRepairState === "running" + ? authorityRepairResult?.handoffRequired + ? "等待 Job 交接" + : "修复中" + : "未执行"; + const authorityRepairUpdatedLabel = ps.authorityRepairUpdatedAt + ? _formatTaskProfileTime(ps.authorityRepairUpdatedAt) + : "—"; + const authorityRepairPlanLabel = authorityRepairPlan.ok + ? authorityRepairPlan.steps.map((step) => step.label).join(" → ") + : authorityRepairPlan.summary.label || "当前无需编排修复"; + const authorityRepairResultLabel = authorityRepairResult?.steps?.length + ? `${Number(authorityRepairResult.steps.length || 0)} 步${ + authorityRepairResult?.handoffRequired + ? authorityRepairHandoffJobId + ? ` · job ${authorityRepairHandoffJobId}` + : " · 已交接异步 Job" + : "" + }` + : "—"; const authorityBaseline = ps.authorityPerformanceBaseline && typeof ps.authorityPerformanceBaseline === "object" && @@ -3160,6 +3197,15 @@ function _refreshTaskPersistence() { const authorityArtifactHistoryUpdatedLabel = ps.authorityDiagnosticsArtifactsUpdatedAt ? _formatTaskProfileTime(ps.authorityDiagnosticsArtifactsUpdatedAt) : "—"; + const authorityJobTrackingLabel = (() => { + const mode = String(ps.authorityJobTrackingMode || "idle").trim() || "idle"; + const reason = String(ps.authorityJobTrackingReason || "").trim(); + return reason ? `${mode} · ${reason}` : mode; + })(); + const authorityArtifactRetentionLabel = `最近 ${Number(ps.authorityDiagnosticsRetentionLimit || 0) || 0} 条`; + const authorityArtifactPruneLabel = ps.authorityDiagnosticsLastPrunedAt + ? `${Number(ps.authorityDiagnosticsLastPrunedCount || 0)} 条 · ${_formatTaskProfileTime(ps.authorityDiagnosticsLastPrunedAt)}` + : "未触发"; const activeRegionLabel = String( historyState?.activeRegion || historyState?.lastExtractedRegion || @@ -3262,10 +3308,15 @@ function _refreshTaskPersistence() { ["Blob rev", authorityAuditBlobRevision], ["Blob path", authorityAuditBlobPath], ["建议动作", authorityAuditActionsLabel], + ["建议修复", authorityRepairPlanLabel], + ["修复状态", authorityRepairLabel], + ["修复结果", authorityRepairResultLabel], ["最近审计", authorityAuditUpdatedLabel], + ["最近修复", authorityRepairUpdatedLabel], ["恢复状态", authorityRestoreLabel], ["恢复结果", authorityRestoreResult?.revision ? `rev ${Number(authorityRestoreResult.revision)}` : "—"], ["最近恢复", authorityRestoreUpdatedLabel], + ["Job 追踪", authorityJobTrackingLabel], ["Baseline 图谱", authorityBaselineGraphLabel], ["Baseline Load", authorityBaselineLoadLabel], ["Baseline Persist", authorityBaselinePersistLabel], @@ -3282,11 +3333,17 @@ function _refreshTaskPersistence() { ["诊断包原因", ps.authorityDiagnosticsBundleReason || "—"], ["诊断清单", authorityArtifactManifestPathLabel], ["工件记录", `${authorityArtifactEntries.length} 条`], + ["Retention", authorityArtifactRetentionLabel], + ["最近 Prune", authorityArtifactPruneLabel], + ["Prune 错误", ps.authorityDiagnosticsLastPruneError || ""], ["列表刷新", authorityArtifactHistoryUpdatedLabel], ]; const authorityAuditActions = Array.isArray(ps.authorityConsistencyAudit?.actions) ? ps.authorityConsistencyAudit.actions.map((value) => String(value || "").trim()).filter(Boolean) : []; + const showAuthorityRepairAction = + authorityRepairPlan.ok && + authorityRepairPlan.blockedIssueCodes.length === 0; const showAuthorityCheckpointWriteAction = authorityAuditActions.includes("write-authority-checkpoint") || (!ps.authorityBlobCheckpointPath && ps.authorityBlobReady); @@ -3296,6 +3353,9 @@ function _refreshTaskPersistence() { typeof _actionHandlers.runAuthorityConsistencyAudit === "function" ? `` : "", + showAuthorityRepairAction && typeof _actionHandlers.runAuthorityConsistencyRepairPlan === "function" + ? `` + : "", showAuthorityCheckpointWriteAction && typeof _actionHandlers.writeAuthorityCheckpoint === "function" ? `` : "", @@ -3386,8 +3446,10 @@ function _refreshTaskPersistence() { ${renderRowsTwoColumn(authorityRows)}