diff --git a/graph/graph-persistence.js b/graph/graph-persistence.js index 87add57..d4628ab 100644 --- a/graph/graph-persistence.js +++ b/graph/graph-persistence.js @@ -1,7 +1,7 @@ // ST-BME: 图谱持久化常量与纯工具函数 // 不依赖 index.js 模块级可变状态(currentGraph / graphPersistenceState 等) -import { deserializeGraph, serializeGraph } from "./graph.js"; +import { deserializeGraph, getGraphStats, serializeGraph } from "./graph.js"; import { normalizeGraphRuntimeState } from "../runtime/runtime-state.js"; // ═══════════════════════════════════════════════════════════ @@ -10,6 +10,7 @@ import { normalizeGraphRuntimeState } from "../runtime/runtime-state.js"; export const MODULE_NAME = "st_bme"; export const GRAPH_METADATA_KEY = "st_bme_graph"; +export const GRAPH_COMMIT_MARKER_KEY = "st_bme_commit_marker"; export const GRAPH_PERSISTENCE_META_KEY = "__stBmePersistence"; export const GRAPH_LOAD_STATES = Object.freeze({ NO_CHAT: "no-chat", @@ -373,6 +374,155 @@ export function writeChatMetadataPatch(context, patch = {}) { return true; } +export function normalizeGraphCommitMarker(marker = null) { + if (!marker || typeof marker !== "object" || Array.isArray(marker)) { + return null; + } + + const revision = Number(marker.revision); + const lastProcessedAssistantFloor = Number(marker.lastProcessedAssistantFloor); + const extractionCount = Number(marker.extractionCount); + const nodeCount = Number(marker.nodeCount); + const edgeCount = Number(marker.edgeCount); + const archivedCount = Number(marker.archivedCount); + + return { + revision: Number.isFinite(revision) && revision > 0 ? revision : 0, + lastProcessedAssistantFloor: + Number.isFinite(lastProcessedAssistantFloor) + ? Math.floor(lastProcessedAssistantFloor) + : -1, + extractionCount: + Number.isFinite(extractionCount) && extractionCount >= 0 + ? Math.floor(extractionCount) + : 0, + nodeCount: + Number.isFinite(nodeCount) && nodeCount >= 0 ? Math.floor(nodeCount) : 0, + edgeCount: + Number.isFinite(edgeCount) && edgeCount >= 0 ? Math.floor(edgeCount) : 0, + archivedCount: + Number.isFinite(archivedCount) && archivedCount >= 0 + ? Math.floor(archivedCount) + : 0, + persistedAt: String(marker.persistedAt || ""), + storageTier: String(marker.storageTier || "none"), + accepted: marker.accepted === true, + reason: String(marker.reason || ""), + chatId: normalizeIdentityValue(marker.chatId), + integrity: normalizeIdentityValue(marker.integrity), + }; +} + +export function buildGraphCommitMarker( + graph, + { + revision = 0, + storageTier = "none", + accepted = false, + reason = "", + persistedAt = "", + chatId = "", + integrity = "", + lastProcessedAssistantFloor = null, + extractionCount = null, + } = {}, +) { + const stats = graph ? getGraphStats(graph) : null; + const historyState = graph?.historyState || {}; + const hasExplicitLastProcessedFloor = + lastProcessedAssistantFloor !== null && + lastProcessedAssistantFloor !== undefined && + lastProcessedAssistantFloor !== ""; + const hasExplicitExtractionCount = + extractionCount !== null && + extractionCount !== undefined && + extractionCount !== ""; + return normalizeGraphCommitMarker({ + revision, + lastProcessedAssistantFloor: + hasExplicitLastProcessedFloor && + Number.isFinite(Number(lastProcessedAssistantFloor)) + ? Number(lastProcessedAssistantFloor) + : Number.isFinite(Number(historyState.lastProcessedAssistantFloor)) + ? Number(historyState.lastProcessedAssistantFloor) + : Number.isFinite(Number(stats?.lastProcessedSeq)) + ? Number(stats.lastProcessedSeq) + : -1, + extractionCount: + hasExplicitExtractionCount && + Number.isFinite(Number(extractionCount)) + ? Number(extractionCount) + : Number.isFinite(Number(historyState.extractionCount)) + ? Number(historyState.extractionCount) + : 0, + nodeCount: Number(stats?.activeNodes || 0), + edgeCount: Number(stats?.totalEdges || 0), + archivedCount: Number(stats?.archivedNodes || 0), + persistedAt: String(persistedAt || new Date().toISOString()), + storageTier: String(storageTier || "none"), + accepted: accepted === true, + reason: String(reason || ""), + chatId, + integrity, + }); +} + +export function readGraphCommitMarker(context = null) { + const rawMarker = + context?.chatMetadata && + typeof context.chatMetadata === "object" && + !Array.isArray(context.chatMetadata) + ? context.chatMetadata[GRAPH_COMMIT_MARKER_KEY] + : null; + const marker = normalizeGraphCommitMarker(rawMarker); + return marker?.revision ? marker : null; +} + +export function getAcceptedCommitMarkerRevision(marker = null) { + const normalizedMarker = normalizeGraphCommitMarker(marker); + return normalizedMarker?.accepted === true + ? Number(normalizedMarker.revision || 0) + : 0; +} + +export function detectIndexedDbSnapshotCommitMarkerMismatch( + snapshot = null, + marker = null, +) { + const normalizedMarker = normalizeGraphCommitMarker(marker); + if (!normalizedMarker || normalizedMarker.accepted !== true) { + return { + mismatched: false, + reason: "", + markerRevision: 0, + snapshotRevision: Number.isFinite(Number(snapshot?.meta?.revision)) + ? Number(snapshot.meta.revision) + : 0, + }; + } + + const snapshotRevision = Number.isFinite(Number(snapshot?.meta?.revision)) + ? Number(snapshot.meta.revision) + : 0; + const markerRevision = Number(normalizedMarker.revision || 0); + if (markerRevision <= 0 || snapshotRevision >= markerRevision) { + return { + mismatched: false, + reason: "", + markerRevision, + snapshotRevision, + }; + } + + return { + mismatched: true, + reason: "persist-mismatch:indexeddb-behind-commit-marker", + markerRevision, + snapshotRevision, + marker: normalizedMarker, + }; +} + // ═══════════════════════════════════════════════════════════ // Shadow Snapshot(会话存储) // ═══════════════════════════════════════════════════════════ diff --git a/index.js b/index.js index bd1d1b4..e879a22 100644 --- a/index.js +++ b/index.js @@ -96,9 +96,13 @@ import { runHierarchicalSummaryPostProcess, } from "./maintenance/hierarchical-summary.js"; import { + buildGraphCommitMarker, + detectIndexedDbSnapshotCommitMarkerMismatch, findGraphShadowSnapshotByIntegrity, + getAcceptedCommitMarkerRevision, GRAPH_LOAD_PENDING_CHAT_ID, GRAPH_LOAD_STATES, + GRAPH_COMMIT_MARKER_KEY, GRAPH_METADATA_KEY, GRAPH_STARTUP_RECONCILE_DELAYS_MS, MODULE_NAME, @@ -110,6 +114,7 @@ import { readGraphShadowSnapshot, removeGraphShadowSnapshot, rememberGraphIdentityAlias, + readGraphCommitMarker, resolveGraphIdentityAliasByHostChatId, stampGraphPersistenceMeta, writeChatMetadataPatch, @@ -288,6 +293,157 @@ function getChatMetadataIntegrity(context = getContext()) { return normalizeChatIdCandidate(context?.chatMetadata?.integrity); } +function getChatCommitMarker(context = getContext()) { + return readGraphCommitMarker(context); +} + +function syncCommitMarkerToPersistenceState(context = getContext()) { + const marker = getChatCommitMarker(context); + updateGraphPersistenceState({ + commitMarker: cloneRuntimeDebugValue(marker, null), + lastAcceptedRevision: Math.max( + Number(graphPersistenceState.lastAcceptedRevision || 0), + getAcceptedCommitMarkerRevision(marker), + ), + }); + return marker; +} + +function persistGraphCommitMarker( + context = getContext(), + { + reason = "graph-commit-marker", + revision = graphPersistenceState.revision, + storageTier = "none", + accepted = false, + lastProcessedAssistantFloor = null, + extractionCount: nextExtractionCount = null, + immediate = true, + } = {}, +) { + if (!context) { + return buildGraphPersistResult({ + saved: false, + blocked: true, + accepted: false, + reason: "missing-context", + revision, + storageTier, + }); + } + + const chatId = getCurrentChatId(context); + if (!chatId) { + return buildGraphPersistResult({ + saved: false, + blocked: true, + accepted: false, + reason: "missing-chat-id", + revision, + storageTier, + }); + } + + const marker = buildGraphCommitMarker(currentGraph, { + revision, + storageTier, + accepted, + reason, + chatId, + integrity: getChatMetadataIntegrity(context), + lastProcessedAssistantFloor, + extractionCount: nextExtractionCount, + }); + if (!marker) { + return buildGraphPersistResult({ + saved: false, + blocked: true, + accepted: false, + reason: "marker-build-failed", + revision, + storageTier, + }); + } + + writeChatMetadataPatch(context, { + [GRAPH_COMMIT_MARKER_KEY]: marker, + }); + const saveMode = triggerChatMetadataSave(context, { immediate }); + updateGraphPersistenceState({ + commitMarker: cloneRuntimeDebugValue(marker, null), + lastAcceptedRevision: accepted + ? Math.max( + Number(graphPersistenceState.lastAcceptedRevision || 0), + Number(marker.revision || 0), + ) + : Number(graphPersistenceState.lastAcceptedRevision || 0), + lastPersistReason: String(reason || ""), + lastPersistMode: `commit-marker:${saveMode}`, + }); + return buildGraphPersistResult({ + saved: true, + blocked: false, + accepted, + reason, + revision: Number(marker.revision || revision || 0), + saveMode, + storageTier, + }); +} + +function applyPersistMismatchBlockedState( + chatId, + mismatch = null, + { source = "persist-mismatch", attemptIndex = 0 } = {}, +) { + const marker = cloneRuntimeDebugValue(mismatch?.marker, null) || getChatCommitMarker(); + const markerRevision = Number(mismatch?.markerRevision || 0); + const snapshotRevision = Number(mismatch?.snapshotRevision || 0); + const reason = String(mismatch?.reason || "persist-mismatch:indexeddb-behind-commit-marker"); + applyGraphLoadState(GRAPH_LOAD_STATES.BLOCKED, { + chatId, + reason: `${source}:${reason}`, + attemptIndex, + revision: Math.max(Number(graphPersistenceState.revision || 0), markerRevision), + lastPersistedRevision: Math.max( + Number(graphPersistenceState.lastPersistedRevision || 0), + snapshotRevision, + ), + pendingPersist: false, + dbReady: true, + writesBlocked: true, + }); + updateGraphPersistenceState({ + persistMismatchReason: reason, + commitMarker: cloneRuntimeDebugValue(marker, null), + lastAcceptedRevision: Math.max( + Number(graphPersistenceState.lastAcceptedRevision || 0), + markerRevision, + ), + dualWriteLastResult: { + action: "load", + source: String(source || "persist-mismatch"), + success: false, + rejected: true, + reason, + markerRevision, + snapshotRevision, + at: Date.now(), + }, + }); + refreshPanelLiveState(); + return { + success: false, + loaded: false, + loadState: GRAPH_LOAD_STATES.BLOCKED, + reason, + chatId, + attemptIndex, + markerRevision, + snapshotRevision, + }; +} + function triggerChatMetadataSave( context = getContext(), { immediate = false } = {}, @@ -552,6 +708,13 @@ function normalizeGraphSyncState(value = "idle") { } function getGraphPersistenceLiveState() { + const liveCommitMarker = + cloneRuntimeDebugValue(graphPersistenceState.commitMarker, null) || + readGraphCommitMarker(getContext()); + const lastAcceptedRevision = Math.max( + Number(graphPersistenceState.lastAcceptedRevision || 0), + getAcceptedCommitMarkerRevision(liveCommitMarker), + ); const snapshot = { loadState: graphPersistenceState.loadState, chatId: graphPersistenceState.chatId, @@ -570,6 +733,9 @@ function getGraphPersistenceLiveState() { metadataIntegrity: graphPersistenceState.metadataIntegrity, writesBlocked: graphPersistenceState.writesBlocked, pendingPersist: graphPersistenceState.pendingPersist, + lastAcceptedRevision, + persistMismatchReason: String(graphPersistenceState.persistMismatchReason || ""), + commitMarker: cloneRuntimeDebugValue(liveCommitMarker, null), queuedPersistMode: graphPersistenceState.queuedPersistMode, queuedPersistRotateIntegrity: graphPersistenceState.queuedPersistRotateIntegrity, @@ -3795,6 +3961,11 @@ function applyShadowSnapshotToRuntime( storageMode: "indexeddb", dbReady: true, indexedDbLastError: "", + persistMismatchReason: "", + lastAcceptedRevision: Math.max( + Number(graphPersistenceState.lastAcceptedRevision || 0), + shadowRevision, + ), metadataIntegrity: getChatMetadataIntegrity(getContext()) || graphPersistenceState.metadataIntegrity, @@ -4609,6 +4780,7 @@ function applyIndexedDbEmptyToRuntime( storagePrimary: "indexeddb", storageMode: "indexeddb", dbReady: true, + persistMismatchReason: "", indexedDbRevision: 0, indexedDbLastError: "", dualWriteLastResult: { @@ -4638,6 +4810,7 @@ function applyIndexedDbSnapshotToRuntime( { source = "indexeddb", attemptIndex = 0 } = {}, ) { const normalizedChatId = normalizeChatIdCandidate(chatId); + syncCommitMarkerToPersistenceState(getContext()); if (!normalizedChatId || !isIndexedDbSnapshotMeaningful(snapshot)) { return { success: false, @@ -4802,11 +4975,16 @@ function applyIndexedDbSnapshotToRuntime( storagePrimary: "indexeddb", storageMode: "indexeddb", dbReady: true, + persistMismatchReason: "", indexedDbRevision: revision, metadataIntegrity: getChatMetadataIntegrity(getContext()) || - graphPersistenceState.metadataIntegrity, + graphPersistenceState.metadataIntegrity, indexedDbLastError: "", + lastAcceptedRevision: Math.max( + Number(graphPersistenceState.lastAcceptedRevision || 0), + revision, + ), lastSyncError: "", dualWriteLastResult: { action: "load", @@ -4851,6 +5029,7 @@ async function loadGraphFromIndexedDb( } = {}, ) { const normalizedChatId = normalizeChatIdCandidate(chatId); + const commitMarker = syncCommitMarkerToPersistenceState(getContext()); if (!normalizedChatId) { return { success: false, @@ -4974,7 +5153,38 @@ async function loadGraphFromIndexedDb( cacheIndexedDbSnapshot(normalizedChatId, snapshot); + const commitMarkerMismatch = detectIndexedDbSnapshotCommitMarkerMismatch( + snapshot, + commitMarker, + ); if (!isIndexedDbSnapshotMeaningful(snapshot)) { + if (commitMarkerMismatch.mismatched) { + if ( + shadowSnapshot && + Number(shadowSnapshot.revision || 0) >= + Number(commitMarkerMismatch.markerRevision || 0) + ) { + const shadowRestoreResult = applyShadowSnapshotToRuntime( + normalizedChatId, + shadowSnapshot, + { + source: `${source}:shadow-indexeddb-empty`, + attemptIndex, + }, + ); + if (shadowRestoreResult?.loaded) { + return shadowRestoreResult; + } + } + return applyPersistMismatchBlockedState( + normalizedChatId, + commitMarkerMismatch, + { + source: `${source}:indexeddb-empty`, + attemptIndex, + }, + ); + } if (shadowSnapshot) { const shadowRestoreResult = applyShadowSnapshotToRuntime( normalizedChatId, @@ -5039,6 +5249,33 @@ async function loadGraphFromIndexedDb( }, ); } + if (commitMarkerMismatch.mismatched) { + if ( + shadowSnapshot && + Number(shadowSnapshot.revision || 0) >= + Number(commitMarkerMismatch.markerRevision || 0) + ) { + return applyShadowSnapshotToRuntime( + normalizedChatId, + shadowSnapshot, + { + source: `${source}:shadow-beats-commit-marker`, + attemptIndex, + }, + ); + } + return applyPersistMismatchBlockedState( + normalizedChatId, + { + ...commitMarkerMismatch, + marker: commitMarkerMismatch.marker || commitMarker, + }, + { + source: `${source}:indexeddb-commit-marker`, + attemptIndex, + }, + ); + } const shouldAllowOverride = allowOverride || BME_INDEXEDDB_FALLBACK_LOAD_STATE_SET.has( @@ -5634,6 +5871,8 @@ function buildGraphPersistResult({ saved = false, queued = false, blocked = false, + accepted = false, + storageTier = "none", reason = "", loadState = graphPersistenceState.loadState, revision = graphPersistenceState.revision, @@ -5643,6 +5882,8 @@ function buildGraphPersistResult({ saved, queued, blocked, + accepted, + storageTier: String(storageTier || "none"), reason: String(reason || ""), loadState, revision: Number.isFinite(revision) ? revision : 0, @@ -5730,6 +5971,7 @@ function persistGraphToChatMetadata( lastPersistReason: String(reason || ""), lastPersistMode: saveMode, metadataIntegrity: String(nextIntegrity || ""), + persistMismatchReason: "", storagePrimary: "metadata", storageMode: "metadata", indexedDbLastError: "", @@ -5742,10 +5984,12 @@ function persistGraphToChatMetadata( return buildGraphPersistResult({ saved: true, + accepted: true, reason, loadState: graphPersistenceState.loadState, revision, saveMode, + storageTier: "metadata-full", }); } @@ -5755,7 +5999,7 @@ function queueGraphPersist( { immediate = true } = {}, ) { const queuedChatId = graphPersistenceState.chatId || getCurrentChatId(); - maybeCaptureGraphShadowSnapshot(reason); + const shadowCaptured = maybeCaptureGraphShadowSnapshot(reason); updateGraphPersistenceState({ queuedPersistRevision: Math.max( graphPersistenceState.queuedPersistRevision || 0, @@ -5773,10 +6017,12 @@ function queueGraphPersist( return buildGraphPersistResult({ queued: true, blocked: true, + accepted: shadowCaptured, reason, loadState: graphPersistenceState.loadState, revision, saveMode: immediate ? "immediate" : "debounced", + storageTier: shadowCaptured ? "shadow" : "none", }); } @@ -5832,6 +6078,158 @@ function maybeFlushQueuedGraphPersist(reason = "queued-graph-persist") { }); } +async function persistExtractionBatchResult({ + reason = "extraction-batch-complete", + lastProcessedAssistantFloor = null, +} = {}) { + ensureCurrentGraphRuntimeState(); + const context = getContext(); + if (!context || !currentGraph) { + return buildGraphPersistResult({ + saved: false, + blocked: true, + accepted: false, + reason: "missing-context-or-graph", + storageTier: "none", + }); + } + + const chatId = getCurrentChatId(context); + if (!chatId) { + return buildGraphPersistResult({ + saved: false, + blocked: true, + accepted: false, + reason: "missing-chat-id", + storageTier: "none", + }); + } + + const revision = bumpGraphRevision(reason); + const indexedDbResult = await saveGraphToIndexedDb(chatId, currentGraph, { + revision, + reason, + }); + if (indexedDbResult?.saved) { + persistGraphCommitMarker(context, { + reason, + revision, + storageTier: "indexeddb", + accepted: true, + lastProcessedAssistantFloor, + extractionCount, + immediate: true, + }); + updateGraphPersistenceState({ + pendingPersist: false, + persistMismatchReason: "", + lastAcceptedRevision: Math.max( + Number(graphPersistenceState.lastAcceptedRevision || 0), + revision, + ), + lastPersistReason: String(reason || ""), + lastPersistMode: "indexeddb", + }); + return buildGraphPersistResult({ + saved: true, + accepted: true, + reason, + revision, + saveMode: "indexeddb", + storageTier: "indexeddb", + }); + } + + const shadowReason = `${reason}:shadow-fallback`; + const shadowCaptured = maybeCaptureGraphShadowSnapshot(shadowReason); + if (shadowCaptured) { + if (isGraphMetadataWriteAllowed()) { + persistGraphCommitMarker(context, { + reason: shadowReason, + revision, + storageTier: "shadow", + accepted: true, + lastProcessedAssistantFloor, + extractionCount, + immediate: true, + }); + } + updateGraphPersistenceState({ + pendingPersist: false, + persistMismatchReason: "", + lastAcceptedRevision: Math.max( + Number(graphPersistenceState.lastAcceptedRevision || 0), + revision, + ), + lastPersistReason: shadowReason, + lastPersistMode: "shadow", + }); + return buildGraphPersistResult({ + saved: false, + accepted: true, + reason: shadowReason, + revision, + saveMode: "shadow", + storageTier: "shadow", + }); + } + + if (isGraphMetadataWriteAllowed()) { + const metadataReason = `${reason}:metadata-full-fallback`; + const metadataResult = persistGraphToChatMetadata(context, { + reason: metadataReason, + revision, + immediate: true, + }); + if (metadataResult?.saved) { + persistGraphCommitMarker(context, { + reason: metadataReason, + revision, + storageTier: "metadata-full", + accepted: true, + lastProcessedAssistantFloor, + extractionCount, + immediate: true, + }); + updateGraphPersistenceState({ + pendingPersist: false, + persistMismatchReason: "", + lastAcceptedRevision: Math.max( + Number(graphPersistenceState.lastAcceptedRevision || 0), + revision, + ), + }); + return buildGraphPersistResult({ + saved: true, + accepted: true, + reason: metadataReason, + revision, + saveMode: metadataResult.saveMode, + storageTier: "metadata-full", + }); + } + } + + const queuedResult = queueGraphPersist(`${reason}:pending`, revision, { + immediate: true, + }); + updateGraphPersistenceState({ + pendingPersist: true, + lastPersistReason: String(queuedResult.reason || `${reason}:pending`), + lastPersistMode: String(queuedResult.saveMode || ""), + }); + return buildGraphPersistResult({ + saved: false, + queued: Boolean(queuedResult?.queued), + blocked: Boolean(queuedResult?.blocked), + accepted: false, + reason: String(queuedResult?.reason || `${reason}:pending`), + revision, + saveMode: String(queuedResult?.saveMode || ""), + storageTier: String(queuedResult?.storageTier || "none"), + }); +} + function scheduleGraphLoadRetry( chatId, reason = "metadata-pending", @@ -5920,6 +6318,7 @@ function shouldSyncGraphLoadFromLiveContext( function syncGraphLoadFromLiveContext(options = {}) { const { source = "live-context-sync", force = false } = options; const context = getContext(); + syncCommitMarkerToPersistenceState(context); if (!shouldSyncGraphLoadFromLiveContext(context, { force })) { return { synced: false, @@ -6505,6 +6904,12 @@ function updateProcessedHistorySnapshot(chat, lastProcessedAssistantFloor) { function shouldAdvanceProcessedHistory(batchStatus) { if (!batchStatus || typeof batchStatus !== "object") return false; + if (batchStatus.historyAdvanceAllowed === true) { + return true; + } + if (batchStatus.historyAdvanceAllowed === false) { + return false; + } return ( batchStatus?.stages?.core?.outcome === "success" && batchStatus?.stages?.finalize?.outcome === "success" && @@ -6840,6 +7245,7 @@ function loadGraphFromChat(options = {}) { const context = getContext(); const chatIdentity = resolveCurrentChatIdentity(context); const chatId = chatIdentity.chatId; + const commitMarker = syncCommitMarkerToPersistenceState(context); const shadowSnapshot = resolveCompatibleGraphShadowSnapshot(chatIdentity); const normalizedExpectedChatId = String(expectedChatId || ""); if (attemptIndex === 0) { @@ -6999,6 +7405,13 @@ function loadGraphFromChat(options = {}) { 1, getGraphPersistedRevision(officialGraph), ); + const metadataCommitMismatch = detectIndexedDbSnapshotCommitMarkerMismatch( + buildSnapshotFromGraph(officialGraph, { + chatId, + revision: officialRevision, + }), + commitMarker, + ); const officialRuntimeStaleDecision = detectStaleIndexedDbSnapshotAgainstRuntime( chatId, @@ -7045,6 +7458,28 @@ function loadGraphFromChat(options = {}) { }; } + if (metadataCommitMismatch.mismatched) { + clearPendingGraphLoadRetry(); + if ( + shadowSnapshot && + Number(shadowSnapshot.revision || 0) >= + Number(metadataCommitMismatch.markerRevision || 0) + ) { + return applyShadowSnapshotToRuntime(chatId, shadowSnapshot, { + source: `${source}:metadata-shadow`, + attemptIndex, + }); + } + return applyPersistMismatchBlockedState( + chatId, + metadataCommitMismatch, + { + source: `${source}:metadata-compat`, + attemptIndex, + }, + ); + } + if (shadowSnapshot && shadowDecision?.reason) { updateGraphPersistenceState({ dualWriteLastResult: { @@ -7171,6 +7606,27 @@ function loadGraphFromChat(options = {}) { } if (shadowSnapshot) { + const acceptedCommitRevision = getAcceptedCommitMarkerRevision(commitMarker); + if ( + acceptedCommitRevision > 0 && + Number(shadowSnapshot.revision || 0) < acceptedCommitRevision + ) { + clearPendingGraphLoadRetry(); + return applyPersistMismatchBlockedState( + chatId, + { + mismatched: true, + reason: "persist-mismatch:indexeddb-behind-commit-marker", + markerRevision: acceptedCommitRevision, + snapshotRevision: Number(shadowSnapshot.revision || 0), + marker: commitMarker, + }, + { + source: `${source}:shadow-no-official`, + attemptIndex, + }, + ); + } clearPendingGraphLoadRetry(); return applyShadowSnapshotToRuntime(chatId, shadowSnapshot, { source: `${source}:shadow-no-official`, @@ -7276,10 +7732,12 @@ async function saveGraphToIndexedDb( storagePrimary: "indexeddb", storageMode: "indexeddb", dbReady: true, + lastPersistedRevision: snapshot.meta.revision, + pendingPersist: false, indexedDbRevision: snapshot.meta.revision, metadataIntegrity: getChatMetadataIntegrity(getContext()) || - graphPersistenceState.metadataIntegrity, + graphPersistenceState.metadataIntegrity, indexedDbLastError: "", lastSyncError: "", dualWriteLastResult: { @@ -7505,14 +7963,16 @@ function saveGraphToChat(options = {}) { }, }); return buildGraphPersistResult({ - saved: Boolean(shouldQueueIndexedDbPersist), - queued: false, + saved: false, + queued: Boolean(shouldQueueIndexedDbPersist), blocked: false, + accepted: false, reason: shouldQueueIndexedDbPersist ? "indexeddb-queued" : "indexeddb-empty-skip", revision, saveMode, + storageTier: shouldQueueIndexedDbPersist ? "indexeddb" : "none", }); } @@ -9415,7 +9875,7 @@ async function executeExtractionBatch({ getLastProcessedAssistantFloor, getSchema, handleExtractionSuccess, - saveGraphToChat, + persistExtractionBatchResult, setBatchStageOutcome, setLastExtractionStatus, shouldAdvanceProcessedHistory, diff --git a/maintenance/extraction-controller.js b/maintenance/extraction-controller.js index 8acd4b3..96fd0d0 100644 --- a/maintenance/extraction-controller.js +++ b/maintenance/extraction-controller.js @@ -54,6 +54,68 @@ function normalizeSmartTriggerDecision(decision = null) { }; } +function normalizePersistenceStateRecord(persistResult = null) { + const accepted = persistResult?.accepted === true; + const queued = persistResult?.queued === true; + const blocked = persistResult?.blocked === true; + let outcome = "failed"; + if (accepted && String(persistResult?.storageTier || "") === "indexeddb") { + outcome = "saved"; + } else if (accepted) { + outcome = "fallback"; + } else if (queued) { + outcome = "queued"; + } else if (blocked) { + outcome = "blocked"; + } + + return { + outcome, + accepted, + storageTier: String(persistResult?.storageTier || "none"), + reason: String(persistResult?.reason || ""), + revision: Number.isFinite(Number(persistResult?.revision)) + ? Number(persistResult.revision) + : 0, + saveMode: String(persistResult?.saveMode || ""), + saved: persistResult?.saved === true, + queued, + blocked, + }; +} + +function getPendingPersistenceGateInfo(runtime) { + const graph = runtime?.getCurrentGraph?.(); + const batchStatus = graph?.historyState?.lastBatchStatus || null; + const persistence = batchStatus?.persistence || null; + const pendingPersist = runtime?.getGraphPersistenceState?.()?.pendingPersist === true; + const accepted = persistence?.accepted === true; + if (!pendingPersist && (!persistence || accepted)) { + return null; + } + + return { + pendingPersist, + accepted, + outcome: String(persistence?.outcome || ""), + reason: String(persistence?.reason || ""), + revision: Number.isFinite(Number(persistence?.revision)) + ? Number(persistence.revision) + : 0, + }; +} + +function formatPendingPersistenceGateMessage(runtime, operationLabel = "当前提取") { + const gate = getPendingPersistenceGateInfo(runtime); + if (!gate) return ""; + const reason = gate.reason ? ` · ${gate.reason}` : ""; + const revision = + Number.isFinite(Number(gate.revision)) && Number(gate.revision) > 0 + ? ` · rev ${Number(gate.revision)}` + : ""; + return `${operationLabel}已暂停:上一批持久化尚未确认,请优先重试持久化或触发恢复${revision}${reason}`; +} + export function resolveAutoExtractionPlanController( runtime, { @@ -255,24 +317,13 @@ export async function executeExtractionBatchController( batchStatus, ); const batchStatusRef = effects?.batchStatus || batchStatus; - const persistResult = runtime.saveGraphToChat({ + const persistResult = await runtime.persistExtractionBatchResult({ reason: "extraction-batch-complete", - persistMetadata: true, - captureShadow: true, - immediate: true, + lastProcessedAssistantFloor: endIdx, }); - const persistAccepted = Boolean( - persistResult?.saved || persistResult?.queued, - ); - - if (!persistAccepted) { - runtime.setBatchStageOutcome( - batchStatusRef, - "finalize", - "failed", - `图谱持久化失败: ${persistResult?.reason || "unknown-persist-failure"}`, - ); - } + const persistence = normalizePersistenceStateRecord(persistResult); + batchStatusRef.persistence = persistence; + batchStatusRef.historyAdvanceAllowed = persistence.accepted === true; const finalizedBatchStatus = runtime.finalizeBatchStatus( batchStatusRef, runtime.getExtractionCount(), @@ -280,33 +331,26 @@ export async function executeExtractionBatchController( runtime.getCurrentGraph().historyState.lastBatchStatus = { ...finalizedBatchStatus, - historyAdvanced: runtime.shouldAdvanceProcessedHistory(finalizedBatchStatus), - persist: persistResult - ? { - saved: Boolean(persistResult.saved), - queued: Boolean(persistResult.queued), - blocked: Boolean(persistResult.blocked), - reason: String(persistResult.reason || ""), - saveMode: String(persistResult.saveMode || ""), - revision: Number.isFinite(Number(persistResult.revision)) - ? Number(persistResult.revision) - : 0, - } - : null, + persistence, + historyAdvanceAllowed: persistence.accepted === true, + historyAdvanced: runtime.shouldAdvanceProcessedHistory({ + ...finalizedBatchStatus, + historyAdvanceAllowed: persistence.accepted === true, + }), }; if (runtime.getCurrentGraph().historyState.lastBatchStatus.historyAdvanced) { runtime.updateProcessedHistorySnapshot(chat, endIdx); - } else if (!persistAccepted) { + } else if (!persistence.accepted) { runtime.setLastExtractionStatus( "提取待恢复", - `楼层 ${startIdx}-${endIdx} 已抽取但未确认写盘成功,请稍后重试或检查持久化状态`, + `楼层 ${startIdx}-${endIdx} 已抽取,但持久化状态为 ${persistence.outcome || "failed"}${persistence.reason ? ` · ${persistence.reason}` : ""}`, "warning", { syncRuntime: true }, ); runtime.console?.warn?.("[ST-BME] extraction persist not accepted", { chatId: runtime.getGraphPersistenceState?.()?.chatId || "", - persist: persistResult, + persistence, processedRange: [startIdx, endIdx], }); } @@ -330,8 +374,13 @@ export async function executeExtractionBatchController( return { success: finalizedBatchStatus.completed, result, - effects, + effects: { + ...(effects || {}), + persistResult, + }, batchStatus: finalizedBatchStatus, + persistResult, + historyAdvanceAllowed: persistence.accepted === true, error: finalizedBatchStatus.completed ? "" : effects?.vectorError || @@ -381,6 +430,27 @@ export async function runExtractionController(runtime, options = {}) { return; } + const pendingPersistMessage = formatPendingPersistenceGateMessage( + runtime, + "自动提取", + ); + if (pendingPersistMessage) { + runtime.console?.debug?.("[ST-BME] auto extraction paused: pending persistence", { + persistence: runtime.getCurrentGraph?.()?.historyState?.lastBatchStatus?.persistence || null, + }); + runtime.deferAutoExtraction?.("pending-persist", { + targetEndFloor: deferredTargetEndFloor, + strategy: plan.strategy, + }); + runtime.setLastExtractionStatus( + "等待持久化确认", + pendingPersistMessage, + "warning", + { syncRuntime: true }, + ); + return; + } + if (!runtime.getCurrentGraph()) { runtime.ensureCurrentGraphRuntimeState?.(); } @@ -436,12 +506,22 @@ export async function runExtractionController(runtime, options = {}) { return; } - runtime.setLastExtractionStatus( - "提取完成", - `楼层 ${startIdx}-${endIdx} · 新建 ${batchResult.result?.newNodes || 0} · 更新 ${batchResult.result?.updatedNodes || 0} · 新边 ${batchResult.result?.newEdges || 0}`, - "success", - { syncRuntime: true }, - ); + const persistence = batchResult.batchStatus?.persistence || null; + if (batchResult.historyAdvanceAllowed === false) { + runtime.setLastExtractionStatus( + "提取完成,持久化待确认", + `楼层 ${startIdx}-${endIdx} · 新建 ${batchResult.result?.newNodes || 0} · 更新 ${batchResult.result?.updatedNodes || 0} · 新边 ${batchResult.result?.newEdges || 0}${persistence?.reason ? ` · ${persistence.reason}` : ""}`, + "warning", + { syncRuntime: true }, + ); + } else { + runtime.setLastExtractionStatus( + "提取完成", + `楼层 ${startIdx}-${endIdx} · 新建 ${batchResult.result?.newNodes || 0} · 更新 ${batchResult.result?.updatedNodes || 0} · 新边 ${batchResult.result?.newEdges || 0}`, + "success", + { syncRuntime: true }, + ); + } } catch (e) { if (runtime.isAbortError(e)) { runtime.setLastExtractionStatus( @@ -468,6 +548,22 @@ export async function onManualExtractController(runtime, options = {}) { return; } if (!runtime.ensureGraphMutationReady("手动提取")) return; + const pendingPersistMessage = formatPendingPersistenceGateMessage( + runtime, + "手动提取", + ); + if (pendingPersistMessage) { + runtime.setLastExtractionStatus( + "等待持久化确认", + pendingPersistMessage, + "warning", + { + syncRuntime: true, + }, + ); + runtime.toastr.warning("上一批持久化尚未确认,请先重试持久化或执行恢复"); + return; + } if (!(await runtime.recoverHistoryIfNeeded("manual-extract"))) return; if (!runtime.getCurrentGraph()) { runtime.setCurrentGraph( @@ -548,6 +644,14 @@ export async function onManualExtractController(runtime, options = {}) { warnings.push(...batchResult.effects.warnings); } + if (batchResult.historyAdvanceAllowed === false) { + warnings.push( + batchResult.batchStatus?.persistence?.reason || + "当前批次持久化尚未确认", + ); + break; + } + if (options?.drainAll === false) { break; } @@ -566,19 +670,36 @@ export async function onManualExtractController(runtime, options = {}) { return; } - runtime.toastr.success( - `提取完成:${totals.batches} 批,新建 ${totals.newNodes},更新 ${totals.updatedNodes},新边 ${totals.newEdges}`, - ); - runtime.setLastExtractionStatus( - "手动提取完成", - `${totals.batches} 批 · 新建 ${totals.newNodes} · 更新 ${totals.updatedNodes} · 新边 ${totals.newEdges}`, - "success", - { - syncRuntime: true, - toastKind: "success", - toastTitle: "ST-BME 手动提取", - }, - ); + const pendingAfterRun = getPendingPersistenceGateInfo(runtime); + if (pendingAfterRun) { + runtime.toastr.warning( + `提取完成但持久化待确认:${pendingAfterRun.reason || pendingAfterRun.outcome || "unknown"}`, + ); + runtime.setLastExtractionStatus( + "手动提取完成,持久化待确认", + `${totals.batches} 批 · 新建 ${totals.newNodes} · 更新 ${totals.updatedNodes} · 新边 ${totals.newEdges}${pendingAfterRun.reason ? ` · ${pendingAfterRun.reason}` : ""}`, + "warning", + { + syncRuntime: true, + toastKind: "", + toastTitle: "ST-BME 手动提取", + }, + ); + } else { + runtime.toastr.success( + `提取完成:${totals.batches} 批,新建 ${totals.newNodes},更新 ${totals.updatedNodes},新边 ${totals.newEdges}`, + ); + runtime.setLastExtractionStatus( + "手动提取完成", + `${totals.batches} 批 · 新建 ${totals.newNodes} · 更新 ${totals.updatedNodes} · 新边 ${totals.newEdges}`, + "success", + { + syncRuntime: true, + toastKind: "success", + toastTitle: "ST-BME 手动提取", + }, + ); + } if (warnings.length > 0) { runtime.toastr.warning(warnings.slice(0, 2).join(";"), "ST-BME 提取警告", { timeOut: 5000, diff --git a/runtime/runtime-state.js b/runtime/runtime-state.js index de2e260..59e0570 100644 --- a/runtime/runtime-state.js +++ b/runtime/runtime-state.js @@ -124,12 +124,55 @@ export function normalizeGraphRuntimeState(graph, chatId = "") { Array.isArray(historyState.lastBatchStatus) ) { historyState.lastBatchStatus = null; - } else if ( - typeof historyState.lastBatchStatus.historyAdvanced !== "boolean" - ) { + } else { historyState.lastBatchStatus = { ...historyState.lastBatchStatus, - historyAdvanced: false, + historyAdvanced: + historyState.lastBatchStatus.historyAdvanced === true, + historyAdvanceAllowed: + historyState.lastBatchStatus.historyAdvanceAllowed === true, + persistence: + historyState.lastBatchStatus.persistence && + typeof historyState.lastBatchStatus.persistence === "object" && + !Array.isArray(historyState.lastBatchStatus.persistence) + ? { + outcome: String( + historyState.lastBatchStatus.persistence.outcome || "queued", + ), + accepted: + historyState.lastBatchStatus.persistence.accepted === true, + storageTier: String( + historyState.lastBatchStatus.persistence.storageTier || "none", + ), + reason: String( + historyState.lastBatchStatus.persistence.reason || "", + ), + revision: Number.isFinite( + Number(historyState.lastBatchStatus.persistence.revision), + ) + ? Number(historyState.lastBatchStatus.persistence.revision) + : 0, + saveMode: String( + historyState.lastBatchStatus.persistence.saveMode || "", + ), + saved: + historyState.lastBatchStatus.persistence.saved === true, + queued: + historyState.lastBatchStatus.persistence.queued === true, + blocked: + historyState.lastBatchStatus.persistence.blocked === true, + } + : { + outcome: "queued", + accepted: false, + storageTier: "none", + reason: "", + revision: 0, + saveMode: "", + saved: false, + queued: false, + blocked: false, + }, }; } if (typeof historyState.lastExtractedRegion !== "string") { diff --git a/tests/extraction-persistence-gating.mjs b/tests/extraction-persistence-gating.mjs new file mode 100644 index 0000000..afbd3bc --- /dev/null +++ b/tests/extraction-persistence-gating.mjs @@ -0,0 +1,155 @@ +import assert from "node:assert/strict"; + +import { executeExtractionBatchController } from "../maintenance/extraction-controller.js"; +import { + createBatchStatusSkeleton, + finalizeBatchStatus, + setBatchStageOutcome, +} from "../ui/ui-status.js"; + +function createRuntime(persistResult) { + const graph = { + nodes: [], + edges: [], + historyState: {}, + }; + let processedHistoryUpdates = 0; + + return { + graph, + processedHistoryUpdates, + ensureCurrentGraphRuntimeState() {}, + throwIfAborted() {}, + getCurrentGraph() { + return graph; + }, + getLastProcessedAssistantFloor() { + return 4; + }, + getExtractionCount() { + return 6; + }, + cloneGraphSnapshot(value) { + return JSON.parse(JSON.stringify(value)); + }, + buildExtractionMessages() { + return [{ seq: 5, role: "assistant", content: "测试消息" }]; + }, + createBatchStatusSkeleton, + async extractMemories() { + return { + success: true, + newNodes: 1, + updatedNodes: 0, + newEdges: 0, + newNodeIds: ["node-1"], + processedRange: [5, 5], + }; + }, + getSchema() { + return []; + }, + getEmbeddingConfig() { + return null; + }, + setLastExtractionStatus() {}, + setBatchStageOutcome, + async handleExtractionSuccess(result, _endIdx, _settings, _signal, batchStatus) { + setBatchStageOutcome(batchStatus, "finalize", "success"); + return { + postProcessArtifacts: [], + vectorHashesInserted: [], + warnings: [], + batchStatus, + }; + }, + async persistExtractionBatchResult() { + return persistResult; + }, + finalizeBatchStatus, + shouldAdvanceProcessedHistory(batchStatus) { + return batchStatus.historyAdvanceAllowed === true; + }, + updateProcessedHistorySnapshot() { + processedHistoryUpdates += 1; + }, + appendBatchJournal() {}, + createBatchJournalEntry() { + return { id: "journal-1" }; + }, + computePostProcessArtifacts() { + return []; + }, + getGraphPersistenceState() { + return { chatId: "chat-test" }; + }, + console, + get processedHistoryUpdates() { + return processedHistoryUpdates; + }, + }; +} + +{ + const runtime = createRuntime({ + saved: false, + queued: true, + blocked: true, + accepted: false, + reason: "persist-queued", + revision: 7, + saveMode: "immediate", + storageTier: "none", + }); + const result = await executeExtractionBatchController(runtime, { + chat: [{ is_user: false, mes: "测试" }], + startIdx: 5, + endIdx: 5, + settings: {}, + }); + + assert.equal(result.success, true); + assert.equal(result.historyAdvanceAllowed, false); + assert.equal(runtime.processedHistoryUpdates, 0); + assert.equal( + runtime.graph.historyState.lastBatchStatus.persistence.outcome, + "queued", + ); + assert.equal( + runtime.graph.historyState.lastBatchStatus.historyAdvanceAllowed, + false, + ); +} + +{ + const runtime = createRuntime({ + saved: true, + queued: false, + blocked: false, + accepted: true, + reason: "indexeddb", + revision: 8, + saveMode: "indexeddb", + storageTier: "indexeddb", + }); + const result = await executeExtractionBatchController(runtime, { + chat: [{ is_user: false, mes: "测试" }], + startIdx: 5, + endIdx: 5, + settings: {}, + }); + + assert.equal(result.success, true); + assert.equal(result.historyAdvanceAllowed, true); + assert.equal(runtime.processedHistoryUpdates, 1); + assert.equal( + runtime.graph.historyState.lastBatchStatus.persistence.outcome, + "saved", + ); + assert.equal( + runtime.graph.historyState.lastBatchStatus.historyAdvanceAllowed, + true, + ); +} + +console.log("extraction-persistence-gating tests passed"); diff --git a/tests/graph-persistence.mjs b/tests/graph-persistence.mjs index 5c7dce4..0d68f15 100644 --- a/tests/graph-persistence.mjs +++ b/tests/graph-persistence.mjs @@ -11,12 +11,16 @@ import { } from "../sync/bme-db.js"; import { onMessageReceivedController } from "../host/event-binding.js"; import { + buildGraphCommitMarker, + detectIndexedDbSnapshotCommitMarkerMismatch, cloneGraphForPersistence, cloneRuntimeDebugValue, findGraphShadowSnapshotByIntegrity, + getAcceptedCommitMarkerRevision, getGraphPersistedRevision, getGraphIdentityAliasCandidates, getGraphPersistenceMeta, + GRAPH_COMMIT_MARKER_KEY, getGraphShadowSnapshotStorageKey, GRAPH_LOAD_PENDING_CHAT_ID, GRAPH_IDENTITY_ALIAS_STORAGE_KEY, @@ -27,6 +31,8 @@ import { GRAPH_SHADOW_SNAPSHOT_STORAGE_PREFIX, GRAPH_STARTUP_RECONCILE_DELAYS_MS, MODULE_NAME, + normalizeGraphCommitMarker, + readGraphCommitMarker, readGraphShadowSnapshot, rememberGraphIdentityAlias, removeGraphShadowSnapshot, @@ -384,11 +390,15 @@ async function createGraphPersistenceHarness({ formatRecallContextLine, readPersistedRecallFromUserMessage, cloneGraphForPersistence, + buildGraphCommitMarker, cloneRuntimeDebugValue, + detectIndexedDbSnapshotCommitMarkerMismatch, onMessageReceivedController, + getAcceptedCommitMarkerRevision, getGraphPersistenceMeta, getGraphPersistedRevision, getGraphIdentityAliasCandidates, + GRAPH_COMMIT_MARKER_KEY, getGraphShadowSnapshotStorageKey, GRAPH_IDENTITY_ALIAS_STORAGE_KEY, GRAPH_LOAD_PENDING_CHAT_ID, @@ -400,6 +410,8 @@ async function createGraphPersistenceHarness({ GRAPH_STARTUP_RECONCILE_DELAYS_MS, MODULE_NAME, findGraphShadowSnapshotByIntegrity, + normalizeGraphCommitMarker, + readGraphCommitMarker, readGraphShadowSnapshot, rememberGraphIdentityAlias, removeGraphShadowSnapshot, @@ -1221,8 +1233,8 @@ result = { reason: "blocked-save", markMutation: false, }); - assert.equal(result.saved, true); - assert.equal(result.queued, false); + assert.equal(result.saved, false); + assert.equal(result.queued, true); assert.equal(result.blocked, false); assert.equal(result.saveMode, "indexeddb-queued"); assert.equal(harness.runtimeContext.__chatContext.chatMetadata, undefined); @@ -1941,7 +1953,8 @@ result = { reason: "first-meaningful-graph", }); - assert.equal(result.saved, true); + assert.equal(result.saved, false); + assert.equal(result.queued, true); assert.equal(result.saveMode, "indexeddb-queued"); assert.equal(harness.runtimeContext.__contextImmediateSaveCalls, 0); assert.equal(harness.runtimeContext.__contextSaveCalls, 0); diff --git a/tests/persistence-commit-marker.mjs b/tests/persistence-commit-marker.mjs new file mode 100644 index 0000000..5e98ceb --- /dev/null +++ b/tests/persistence-commit-marker.mjs @@ -0,0 +1,91 @@ +import assert from "node:assert/strict"; + +import { + buildGraphCommitMarker, + detectIndexedDbSnapshotCommitMarkerMismatch, + getAcceptedCommitMarkerRevision, + GRAPH_COMMIT_MARKER_KEY, + normalizeGraphCommitMarker, + readGraphCommitMarker, + writeChatMetadataPatch, +} from "../graph/graph-persistence.js"; +import { addNode, createEmptyGraph, createNode } from "../graph/graph.js"; + +const graph = createEmptyGraph(); +graph.historyState.chatId = "chat-marker"; +graph.historyState.lastProcessedAssistantFloor = 10; +graph.historyState.extractionCount = 4; +addNode( + graph, + createNode({ + type: "event", + fields: { title: "事件A", summary: "测试事件" }, + seq: 10, + }), +); + +const marker = buildGraphCommitMarker(graph, { + revision: 12, + storageTier: "indexeddb", + accepted: true, + reason: "unit-test", +}); +assert.equal(marker.revision, 12); +assert.equal(marker.accepted, true); +assert.equal(marker.lastProcessedAssistantFloor, 10); +assert.equal(marker.extractionCount, 4); +assert.equal(marker.nodeCount, 1); +assert.equal(marker.edgeCount, 0); +assert.equal(marker.archivedCount, 0); +assert.equal(getAcceptedCommitMarkerRevision(marker), 12); + +const normalized = normalizeGraphCommitMarker({ + revision: "15", + lastProcessedAssistantFloor: "18", + extractionCount: "6", + nodeCount: "9", + edgeCount: "3", + archivedCount: "2", + storageTier: "shadow", + accepted: true, + reason: "normalized", +}); +assert.equal(normalized.revision, 15); +assert.equal(normalized.lastProcessedAssistantFloor, 18); +assert.equal(normalized.storageTier, "shadow"); + +const context = { + chatMetadata: {}, +}; +writeChatMetadataPatch(context, { + [GRAPH_COMMIT_MARKER_KEY]: marker, +}); +assert.deepEqual(readGraphCommitMarker(context), marker); + +const mismatch = detectIndexedDbSnapshotCommitMarkerMismatch( + { + meta: { + revision: 9, + }, + }, + marker, +); +assert.equal(mismatch.mismatched, true); +assert.equal( + mismatch.reason, + "persist-mismatch:indexeddb-behind-commit-marker", +); +assert.equal(mismatch.markerRevision, 12); +assert.equal(mismatch.snapshotRevision, 9); + +const noMismatch = detectIndexedDbSnapshotCommitMarkerMismatch( + { + meta: { + revision: 12, + }, + }, + marker, +); +assert.equal(noMismatch.mismatched, false); + +console.log("persistence-commit-marker tests passed"); diff --git a/ui/panel.html b/ui/panel.html index f7be018..da12d4f 100644 --- a/ui/panel.html +++ b/ui/panel.html @@ -200,6 +200,10 @@ — +
+ +
+
diff --git a/ui/panel.js b/ui/panel.js index 1f1d5ac..499c470 100644 --- a/ui/panel.js +++ b/ui/panel.js @@ -298,6 +298,7 @@ let popupRuntimePromise = null; let _getGraph = null; let _getSettings = null; let _getLastExtract = null; +let _getLastBatchStatus = null; let _getLastRecall = null; let _getRuntimeStatus = null; let _getLastExtractionStatus = null; @@ -508,6 +509,7 @@ export async function initPanel({ getGraph, getSettings, getLastExtract, + getLastBatchStatus, getLastRecall, getRuntimeStatus, getLastExtractionStatus, @@ -522,6 +524,7 @@ export async function initPanel({ _getGraph = getGraph; _getSettings = getSettings; _getLastExtract = getLastExtract; + _getLastBatchStatus = getLastBatchStatus; _getLastRecall = getLastRecall; _getRuntimeStatus = getRuntimeStatus; _getLastExtractionStatus = getLastExtractionStatus; @@ -1752,6 +1755,7 @@ function _refreshDashboard() { _setText("bme-status-vector", "等待聊天图谱元数据加载"); _setText("bme-status-recovery", "等待聊天图谱元数据加载"); _setText("bme-status-last-extract", "等待聊天图谱元数据加载"); + _setText("bme-status-last-persist", "等待聊天图谱元数据加载"); _setText("bme-status-last-vector", "等待聊天图谱元数据加载"); _setText("bme-status-last-recall", "等待聊天图谱元数据加载"); _renderStatefulListPlaceholder( @@ -1786,6 +1790,7 @@ function _refreshDashboard() { const vectorSource = graph?.vectorIndexState?.source || "—"; const recovery = graph?.historyState?.lastRecoveryResult; const extractionStatus = _getLastExtractionStatus?.() || {}; + const lastBatchStatus = _getLatestBatchStatusSnapshot(); const vectorStatus = _getLastVectorStatus?.() || {}; const recallStatus = _getLastRecallStatus?.() || {}; const historyPrefix = @@ -1798,11 +1803,7 @@ function _refreshDashboard() { _setText("bme-status-chat-id", chatId); _setText( "bme-status-history", - `${historyPrefix}${ - Number.isFinite(dirtyFrom) - ? `脏区从楼层 ${dirtyFrom} 开始,已处理到 ${lastProcessed}` - : `干净,已处理到楼层 ${lastProcessed}` - }`, + `${historyPrefix}${_formatDashboardHistoryMeta(graph, loadInfo, lastBatchStatus)}`, ); _setText( "bme-status-vector", @@ -1829,6 +1830,10 @@ function _refreshDashboard() { : "暂无恢复记录", ); _setText("bme-status-last-extract", extractionStatus.meta || "尚未执行提取"); + _setText( + "bme-status-last-persist", + _formatDashboardPersistMeta(loadInfo, lastBatchStatus), + ); _setText("bme-status-last-vector", vectorStatus.meta || "尚未执行向量任务"); _setText("bme-status-last-recall", recallStatus.meta || "尚未执行召回"); @@ -5934,6 +5939,7 @@ function _summarizeMonitorGovernance(entry = {}) { : []; const requestCleaning = entry?.requestCleaning || null; const responseCleaning = entry?.responseCleaning || null; + const persistence = entry?.batchStatus?.persistence || entry?.persistence || null; const lines = []; if (worldInfo) { @@ -5964,6 +5970,11 @@ function _summarizeMonitorGovernance(entry = {}) { if (entry?.jsonFailure?.failureReason) { lines.push(`失败原因: ${String(entry.jsonFailure.failureReason || "")}`); } + if (persistence) { + lines.push( + `持久化: ${_formatPersistenceOutcomeLabel(persistence.outcome)} · ${String(persistence.storageTier || "none")}${persistence.reason ? ` · ${String(persistence.reason)}` : ""}`, + ); + } return lines; } @@ -7402,10 +7413,18 @@ function _renderTaskDebugGraphPersistenceCard(graphPersistence) { 最近已持久化 revision ${_escHtml(String(graphPersistence.lastPersistedRevision ?? 0))}
+
+ 最近已接受 revision + ${_escHtml(String(graphPersistence.lastAcceptedRevision ?? 0))} +
排队中的 revision ${_escHtml(String(graphPersistence.queuedPersistRevision ?? 0))}
+
+ 待确认写入 + ${_escHtml(graphPersistence.pendingPersist ? "是" : "否")} +
影子快照 ${_escHtml(graphPersistence.shadowSnapshotUsed ? "已接管" : "未使用")} @@ -7414,6 +7433,24 @@ function _renderTaskDebugGraphPersistenceCard(graphPersistence) { 写保护 ${_escHtml(graphPersistence.writesBlocked ? "已启用" : "未启用")}
+
+ 一致性异常 + ${_escHtml(graphPersistence.persistMismatchReason || "—")} +
+
+ Commit Marker + ${_escHtml( + graphPersistence.commitMarker + ? [ + `rev ${Number(graphPersistence.commitMarker.revision || 0)}`, + graphPersistence.commitMarker.accepted === true ? "accepted" : "pending", + graphPersistence.commitMarker.storageTier || "", + ] + .filter(Boolean) + .join(" · ") + : "—", + )} +
${_renderDebugDetails("图谱持久化详情", graphPersistence)} `; @@ -9170,6 +9207,9 @@ function _getGraphPersistenceSnapshot() { writesBlocked: true, shadowSnapshotUsed: false, pendingPersist: false, + lastAcceptedRevision: 0, + persistMismatchReason: "", + commitMarker: null, chatId: "", storageMode: "indexeddb", dbReady: false, @@ -9181,6 +9221,86 @@ function _getGraphPersistenceSnapshot() { }; } +function _getLatestBatchStatusSnapshot() { + return _getLastBatchStatus?.() || null; +} + +function _formatPersistenceOutcomeLabel(outcome = "") { + switch (String(outcome || "")) { + case "saved": + return "已保存"; + case "fallback": + return "兜底已保存"; + case "queued": + return "已排队"; + case "blocked": + return "已阻塞"; + case "failed": + return "失败"; + default: + return "未知"; + } +} + +function _formatDashboardPersistMeta(loadInfo = {}, batchStatus = null) { + const persistence = batchStatus?.persistence || null; + if (persistence) { + const parts = [ + _formatPersistenceOutcomeLabel(persistence.outcome), + persistence.storageTier ? `tier ${persistence.storageTier}` : "", + Number.isFinite(Number(persistence.revision)) && Number(persistence.revision) > 0 + ? `rev ${Number(persistence.revision)}` + : "", + persistence.reason || "", + ].filter(Boolean); + return parts.join(" · ") || "尚无持久化记录"; + } + + const dualWrite = loadInfo?.dualWriteLastResult || null; + if (dualWrite) { + return [ + dualWrite.success === true ? "最近写入成功" : "最近写入失败", + dualWrite.target || dualWrite.source || "", + Number.isFinite(Number(dualWrite.revision)) && Number(dualWrite.revision) > 0 + ? `rev ${Number(dualWrite.revision)}` + : "", + dualWrite.reason || dualWrite.error || "", + ] + .filter(Boolean) + .join(" · "); + } + + return "尚未执行持久化"; +} + +function _formatDashboardHistoryMeta(graph = null, loadInfo = {}, batchStatus = null) { + const lastConfirmedFloor = + graph?.historyState?.lastProcessedAssistantFloor ?? -1; + const persistence = batchStatus?.persistence || null; + const processedRange = Array.isArray(batchStatus?.processedRange) + ? batchStatus.processedRange + : []; + const pendingFloor = + processedRange.length > 1 && Number.isFinite(Number(processedRange[1])) + ? Number(processedRange[1]) + : null; + + if (persistence && persistence.accepted !== true && pendingFloor != null) { + return `持久化待确认:本地已抽取到楼层 ${pendingFloor},已确认楼层 ${lastConfirmedFloor}`; + } + + if (loadInfo?.persistMismatchReason) { + return `持久化一致性异常:${String(loadInfo.persistMismatchReason || "")} · 已确认楼层 ${lastConfirmedFloor}`; + } + + const dirtyFrom = graph?.historyState?.historyDirtyFrom; + if (Number.isFinite(dirtyFrom)) { + return `脏区从楼层 ${dirtyFrom} 开始,已确认处理到楼层 ${lastConfirmedFloor}`; + } + + return `干净,已确认处理到楼层 ${lastConfirmedFloor}`; +} + function _getGraphLoadLabel(loadState = "") { switch (loadState) { case "loading": diff --git a/ui/ui-status.js b/ui/ui-status.js index 24b6742..7a42ed1 100644 --- a/ui/ui-status.js +++ b/ui/ui-status.js @@ -49,6 +49,9 @@ export function createGraphPersistenceState() { metadataIntegrity: "", writesBlocked: false, pendingPersist: false, + lastAcceptedRevision: 0, + persistMismatchReason: "", + commitMarker: null, storagePrimary: "indexeddb", storageMode: "indexeddb", dbReady: false, @@ -133,6 +136,14 @@ export function createBatchStatusSkeleton({ outcome: "success", consistency: "strong", completed: false, + persistence: { + outcome: "queued", + accepted: false, + storageTier: "none", + reason: "", + revision: 0, + }, + historyAdvanceAllowed: false, warnings: [], errors: [], };