diff --git a/index.js b/index.js index 7665cd7..b7a69d3 100644 --- a/index.js +++ b/index.js @@ -190,6 +190,7 @@ import { } from "./runtime/settings-defaults.js"; import { retrieve } from "./retrieval/retriever.js"; import { + applyProcessedHistorySnapshotToGraph, appendBatchJournal, appendMaintenanceJournal, buildRecoveryResult, @@ -5896,16 +5897,22 @@ function buildGraphPersistResult({ }; } -function maybeCaptureGraphShadowSnapshot(reason = "runtime-shadow") { - const chatId = graphPersistenceState.chatId || getCurrentChatId(); - if (!chatId || !currentGraph) return false; +function maybeCaptureGraphShadowSnapshot( + reason = "runtime-shadow", + { + graph = currentGraph, + chatId = graphPersistenceState.chatId || getCurrentChatId(), + revision = graphPersistenceState.revision, + } = {}, +) { + if (!chatId || !graph) return false; const hasMeaningfulGraphData = - !isGraphEffectivelyEmpty(currentGraph) || + !isGraphEffectivelyEmpty(graph) || graphPersistenceState.shadowSnapshotUsed || graphPersistenceState.lastPersistedRevision > 0; if (!hasMeaningfulGraphData) return false; - return writeGraphShadowSnapshot(chatId, currentGraph, { - revision: graphPersistenceState.revision, + return writeGraphShadowSnapshot(chatId, graph, { + revision, reason, }); } @@ -6023,10 +6030,57 @@ function resolvePendingPersistLastProcessedAssistantFloor() { return null; } +function resolvePendingPersistGraphSource(chatId = "") { + const normalizedChatId = normalizeChatIdCandidate( + chatId || graphPersistenceState.queuedPersistChatId || graphPersistenceState.chatId, + ); + const targetRevision = Math.max( + Number(graphPersistenceState.queuedPersistRevision || 0), + Number(graphPersistenceState.revision || 0), + ); + const shadowSnapshot = normalizedChatId + ? readGraphShadowSnapshot(normalizedChatId) + : null; + + if ( + shadowSnapshot && + Number(shadowSnapshot.revision || 0) >= targetRevision && + typeof shadowSnapshot.serializedGraph === "string" && + shadowSnapshot.serializedGraph + ) { + try { + const shadowGraph = cloneGraphForPersistence( + normalizeGraphRuntimeState( + deserializeGraph(shadowSnapshot.serializedGraph), + normalizedChatId, + ), + normalizedChatId, + ); + return { + graph: shadowGraph, + source: "shadow", + revision: Number(shadowSnapshot.revision || 0), + }; + } catch (error) { + console.warn("[ST-BME] pending persist shadow graph 恢复失败:", error); + } + } + + return { + graph: currentGraph, + source: "runtime", + revision: Math.max( + Number(getGraphPersistedRevision(currentGraph) || 0), + targetRevision, + ), + }; +} + function applyAcceptedPendingPersistState( persistResult, { lastProcessedAssistantFloor = resolvePendingPersistLastProcessedAssistantFloor(), + persistedGraph = null, } = {}, ) { ensureCurrentGraphRuntimeState(); @@ -6042,6 +6096,36 @@ function applyAcceptedPendingPersistState( currentGraph.historyState.lastBatchStatus = batchStatus; } + if ( + persistedGraph && + typeof persistedGraph === "object" && + !Array.isArray(persistedGraph) + ) { + const persistedHistory = + persistedGraph.historyState && + typeof persistedGraph.historyState === "object" && + !Array.isArray(persistedGraph.historyState) + ? persistedGraph.historyState + : null; + if (persistedHistory) { + currentGraph.historyState.processedMessageHashVersion = + persistedHistory.processedMessageHashVersion ?? + currentGraph.historyState.processedMessageHashVersion; + currentGraph.historyState.processedMessageHashes = cloneRuntimeDebugValue( + persistedHistory.processedMessageHashes || {}, + currentGraph.historyState.processedMessageHashes || {}, + ); + currentGraph.historyState.processedMessageHashesNeedRefresh = + persistedHistory.processedMessageHashesNeedRefresh === true; + } + if (Array.isArray(persistedGraph.batchJournal)) { + currentGraph.batchJournal = cloneRuntimeDebugValue( + persistedGraph.batchJournal, + currentGraph.batchJournal || [], + ); + } + } + if ( persistenceRecord.accepted === true && Number.isFinite(Number(lastProcessedAssistantFloor)) && @@ -6133,9 +6217,10 @@ function persistGraphToChatMetadata( reason = "graph-persist", revision = graphPersistenceState.revision, immediate = false, + graph = currentGraph, } = {}, ) { - if (!context || !currentGraph) { + if (!context || !graph) { return buildGraphPersistResult({ saved: false, blocked: true, @@ -6155,19 +6240,21 @@ function persistGraphToChatMetadata( } const nextIntegrity = getChatMetadataIntegrity(context); - const persistedGraph = cloneGraphForPersistence(currentGraph, chatId); + const persistedGraph = cloneGraphForPersistence(graph, chatId); stampGraphPersistenceMeta(persistedGraph, { revision, reason, chatId, integrity: nextIntegrity, }); - stampGraphPersistenceMeta(currentGraph, { - revision, - reason, - chatId, - integrity: nextIntegrity, - }); + if (graph === currentGraph) { + stampGraphPersistenceMeta(currentGraph, { + revision, + reason, + chatId, + integrity: nextIntegrity, + }); + } writeChatMetadataPatch(context, { [GRAPH_METADATA_KEY]: persistedGraph, }); @@ -6219,10 +6306,15 @@ function persistGraphToChatMetadata( function queueGraphPersist( reason = "graph-persist-blocked", revision = graphPersistenceState.revision, - { immediate = true } = {}, + { immediate = true, graph = currentGraph, chatId = undefined } = {}, ) { - const queuedChatId = graphPersistenceState.chatId || getCurrentChatId(); - const shadowCaptured = maybeCaptureGraphShadowSnapshot(reason); + const queuedChatId = + String(chatId || graphPersistenceState.chatId || getCurrentChatId()) || ""; + const shadowCaptured = maybeCaptureGraphShadowSnapshot(reason, { + graph, + chatId: queuedChatId, + revision, + }); updateGraphPersistenceState({ queuedPersistRevision: Math.max( graphPersistenceState.queuedPersistRevision || 0, @@ -6380,16 +6472,20 @@ async function retryPendingGraphPersist({ }); } + const pendingPersistGraphSource = resolvePendingPersistGraphSource( + queuedChatId, + ); + const pendingPersistGraph = pendingPersistGraphSource?.graph || currentGraph; const targetRevision = Math.max( Number(graphPersistenceState.queuedPersistRevision || 0), Number(graphPersistenceState.revision || 0), Number(graphPersistenceState.lastPersistedRevision || 0), - Number(getGraphPersistedRevision(currentGraph) || 0), + Number(pendingPersistGraphSource?.revision || 0), + Number(getGraphPersistedRevision(pendingPersistGraph) || 0), ); const lastProcessedAssistantFloor = resolvePendingPersistLastProcessedAssistantFloor(); - - const indexedDbResult = await saveGraphToIndexedDb(activeChatId, currentGraph, { + const indexedDbResult = await saveGraphToIndexedDb(activeChatId, pendingPersistGraph, { revision: targetRevision, reason, }); @@ -6429,17 +6525,19 @@ async function retryPendingGraphPersist({ }); applyAcceptedPendingPersistState(persistResult, { lastProcessedAssistantFloor, + persistedGraph: pendingPersistGraph, }); void maybeResumePendingAutoExtraction("pending-persist-resolved:indexeddb"); return persistResult; } - if (canPersistGraphToMetadataFallback(context, currentGraph)) { + if (canPersistGraphToMetadataFallback(context, pendingPersistGraph)) { const metadataReason = `${reason}:metadata-full-fallback`; const metadataResult = persistGraphToChatMetadata(context, { reason: metadataReason, revision: targetRevision, immediate: true, + graph: pendingPersistGraph, }); if (metadataResult?.saved) { clearPendingGraphPersistRetry(); @@ -6477,6 +6575,7 @@ async function retryPendingGraphPersist({ }); applyAcceptedPendingPersistState(persistResult, { lastProcessedAssistantFloor, + persistedGraph: pendingPersistGraph, }); void maybeResumePendingAutoExtraction("pending-persist-resolved:metadata"); return persistResult; @@ -6501,10 +6600,15 @@ async function retryPendingGraphPersist({ async function persistExtractionBatchResult({ reason = "extraction-batch-complete", lastProcessedAssistantFloor = null, + graphSnapshot = null, } = {}) { ensureCurrentGraphRuntimeState(); const context = getContext(); - if (!context || !currentGraph) { + const persistGraph = + graphSnapshot && typeof graphSnapshot === "object" + ? cloneGraphSnapshot(graphSnapshot) + : currentGraph; + if (!context || !persistGraph) { return buildGraphPersistResult({ saved: false, blocked: true, @@ -6526,7 +6630,7 @@ async function persistExtractionBatchResult({ } const revision = bumpGraphRevision(reason); - const indexedDbResult = await saveGraphToIndexedDb(chatId, currentGraph, { + const indexedDbResult = await saveGraphToIndexedDb(chatId, persistGraph, { revision, reason, }); @@ -6567,7 +6671,11 @@ async function persistExtractionBatchResult({ } const shadowReason = `${reason}:shadow-fallback`; - const shadowCaptured = maybeCaptureGraphShadowSnapshot(shadowReason); + const shadowCaptured = maybeCaptureGraphShadowSnapshot(shadowReason, { + graph: persistGraph, + chatId, + revision, + }); if (shadowCaptured) { if (isGraphMetadataWriteAllowed()) { persistGraphCommitMarker(context, { @@ -6596,6 +6704,10 @@ async function persistExtractionBatchResult({ queuedPersistReason: "", }); clearPendingGraphPersistRetry(); + queueGraphPersistToIndexedDb(chatId, persistGraph, { + revision, + reason: `${shadowReason}:promote-indexeddb`, + }); return buildGraphPersistResult({ saved: false, accepted: true, @@ -6606,12 +6718,13 @@ async function persistExtractionBatchResult({ }); } - if (canPersistGraphToMetadataFallback(context, currentGraph)) { + if (canPersistGraphToMetadataFallback(context, persistGraph)) { const metadataReason = `${reason}:metadata-full-fallback`; const metadataResult = persistGraphToChatMetadata(context, { reason: metadataReason, revision, immediate: true, + graph: persistGraph, }); if (metadataResult?.saved) { persistGraphCommitMarker(context, { @@ -6637,6 +6750,10 @@ async function persistExtractionBatchResult({ queuedPersistReason: "", }); clearPendingGraphPersistRetry(); + queueGraphPersistToIndexedDb(chatId, persistGraph, { + revision, + reason: `${metadataReason}:promote-indexeddb`, + }); return buildGraphPersistResult({ saved: true, accepted: true, @@ -6650,8 +6767,9 @@ async function persistExtractionBatchResult({ const queuedResult = queueGraphPersist(`${reason}:pending`, revision, { immediate: true, + graph: persistGraph, + chatId, }); - schedulePendingGraphPersistRetry(`${reason}:pending`, 0); updateGraphPersistenceState({ pendingPersist: true, lastPersistReason: String(queuedResult.reason || `${reason}:pending`), @@ -7331,14 +7449,11 @@ function markVectorStateDirty(reason = "向量状态已标记为待重建") { function updateProcessedHistorySnapshot(chat, lastProcessedAssistantFloor) { ensureCurrentGraphRuntimeState(); - currentGraph.historyState.lastProcessedAssistantFloor = - lastProcessedAssistantFloor; - currentGraph.historyState.processedMessageHashVersion = - PROCESSED_MESSAGE_HASH_VERSION; - currentGraph.historyState.processedMessageHashes = - snapshotProcessedMessageHashes(chat, lastProcessedAssistantFloor); - currentGraph.historyState.processedMessageHashesNeedRefresh = false; - currentGraph.lastProcessedSeq = lastProcessedAssistantFloor; + applyProcessedHistorySnapshotToGraph( + currentGraph, + chat, + lastProcessedAssistantFloor, + ); } function shouldAdvanceProcessedHistory(batchStatus) { @@ -10324,6 +10439,7 @@ async function executeExtractionBatch({ return await executeExtractionBatchController( { appendBatchJournal, + applyProcessedHistorySnapshotToGraph, buildExtractionMessages, cloneGraphSnapshot, computePostProcessArtifacts, diff --git a/maintenance/extraction-controller.js b/maintenance/extraction-controller.js index 7cd3331..1e6fb60 100644 --- a/maintenance/extraction-controller.js +++ b/maintenance/extraction-controller.js @@ -84,6 +84,98 @@ function normalizePersistenceStateRecord(persistResult = null) { }; } +function cloneSerializable(value, fallback = null) { + try { + return JSON.parse(JSON.stringify(value)); + } catch { + return fallback; + } +} + +function buildCommittedBatchPersistSnapshot( + runtime, + { + graph = null, + chat = [], + beforeSnapshot = null, + processedRange = [null, null], + postProcessArtifacts = [], + vectorHashesInserted = [], + extractionCountBefore = 0, + } = {}, +) { + if (!graph || typeof runtime?.cloneGraphSnapshot !== "function") { + return { + persistGraphSnapshot: null, + committedBatchJournalEntry: null, + afterSnapshot: null, + committedAfterSnapshot: null, + postProcessArtifacts: Array.isArray(postProcessArtifacts) + ? [...postProcessArtifacts] + : [], + }; + } + + const range = Array.isArray(processedRange) ? processedRange : [null, null]; + const rangeStart = Number.isFinite(Number(range[0])) ? Number(range[0]) : null; + const rangeEnd = Number.isFinite(Number(range[1])) ? Number(range[1]) : null; + const afterSnapshot = runtime.cloneGraphSnapshot(graph); + const effectiveArtifacts = Array.isArray(postProcessArtifacts) + ? [...postProcessArtifacts] + : []; + const committedGraphSnapshot = runtime.cloneGraphSnapshot(graph); + + if (typeof runtime.applyProcessedHistorySnapshotToGraph === "function") { + runtime.applyProcessedHistorySnapshotToGraph( + committedGraphSnapshot, + chat, + rangeEnd, + ); + } else { + if ( + !committedGraphSnapshot.historyState || + typeof committedGraphSnapshot.historyState !== "object" || + Array.isArray(committedGraphSnapshot.historyState) + ) { + committedGraphSnapshot.historyState = {}; + } + committedGraphSnapshot.historyState.lastProcessedAssistantFloor = + Number.isFinite(rangeEnd) ? Math.floor(rangeEnd) : -1; + committedGraphSnapshot.lastProcessedSeq = + Number.isFinite(rangeEnd) ? Math.floor(rangeEnd) : -1; + } + + const committedBatchJournalEntry = + typeof runtime.createBatchJournalEntry === "function" + ? runtime.createBatchJournalEntry(beforeSnapshot, afterSnapshot, { + processedRange: [rangeStart, rangeEnd], + postProcessArtifacts: effectiveArtifacts, + vectorHashesInserted: Array.isArray(vectorHashesInserted) + ? vectorHashesInserted + : [], + extractionCountBefore, + }) + : null; + + if ( + committedBatchJournalEntry && + typeof runtime.appendBatchJournal === "function" + ) { + runtime.appendBatchJournal( + committedGraphSnapshot, + cloneSerializable(committedBatchJournalEntry, committedBatchJournalEntry), + ); + } + + return { + persistGraphSnapshot: committedGraphSnapshot, + committedBatchJournalEntry, + afterSnapshot, + committedAfterSnapshot: runtime.cloneGraphSnapshot(committedGraphSnapshot), + postProcessArtifacts: effectiveArtifacts, + }; +} + function getPendingPersistenceGateInfo(runtime) { const graph = runtime?.getCurrentGraph?.(); const batchStatus = graph?.historyState?.lastBatchStatus || null; @@ -335,9 +427,23 @@ export async function executeExtractionBatchController( batchStatus, ); const batchStatusRef = effects?.batchStatus || batchStatus; + const committedPersistState = buildCommittedBatchPersistSnapshot(runtime, { + graph: runtime.getCurrentGraph(), + chat, + beforeSnapshot, + processedRange: [startIdx, endIdx], + postProcessArtifacts: runtime.computePostProcessArtifacts( + beforeSnapshot, + runtime.cloneGraphSnapshot(runtime.getCurrentGraph()), + effects?.postProcessArtifacts || [], + ), + vectorHashesInserted: effects?.vectorHashesInserted || [], + extractionCountBefore, + }); const persistResult = await runtime.persistExtractionBatchResult({ reason: "extraction-batch-complete", lastProcessedAssistantFloor: endIdx, + graphSnapshot: committedPersistState.persistGraphSnapshot, }); const persistence = normalizePersistenceStateRecord(persistResult); batchStatusRef.persistence = persistence; @@ -359,6 +465,15 @@ export async function executeExtractionBatchController( if (runtime.getCurrentGraph().historyState.lastBatchStatus.historyAdvanced) { runtime.updateProcessedHistorySnapshot(chat, endIdx); + if (committedPersistState.committedBatchJournalEntry) { + runtime.appendBatchJournal( + runtime.getCurrentGraph(), + cloneSerializable( + committedPersistState.committedBatchJournalEntry, + committedPersistState.committedBatchJournalEntry, + ), + ); + } } else if (!persistence.accepted) { runtime.setLastExtractionStatus( "提取待恢复", @@ -373,22 +488,6 @@ export async function executeExtractionBatchController( }); } - const afterSnapshot = runtime.cloneGraphSnapshot(runtime.getCurrentGraph()); - const postProcessArtifacts = runtime.computePostProcessArtifacts( - beforeSnapshot, - afterSnapshot, - effects?.postProcessArtifacts || [], - ); - runtime.appendBatchJournal( - runtime.getCurrentGraph(), - runtime.createBatchJournalEntry(beforeSnapshot, afterSnapshot, { - processedRange: [startIdx, endIdx], - postProcessArtifacts, - vectorHashesInserted: effects?.vectorHashesInserted || [], - extractionCountBefore, - }), - ); - return { success: finalizedBatchStatus.completed, result, diff --git a/runtime/runtime-state.js b/runtime/runtime-state.js index 59e0570..1c82739 100644 --- a/runtime/runtime-state.js +++ b/runtime/runtime-state.js @@ -388,6 +388,38 @@ export function snapshotProcessedMessageHashes( return result; } +export function applyProcessedHistorySnapshotToGraph( + graph, + chat, + lastProcessedAssistantFloor, +) { + if (!graph || typeof graph !== "object") { + return graph; + } + + const historyState = + graph.historyState && typeof graph.historyState === "object" + ? graph.historyState + : createDefaultHistoryState(graph?.historyState?.chatId || ""); + graph.historyState = historyState; + + const safeLastProcessedAssistantFloor = Number.isFinite( + Number(lastProcessedAssistantFloor), + ) + ? Math.floor(Number(lastProcessedAssistantFloor)) + : -1; + + historyState.lastProcessedAssistantFloor = safeLastProcessedAssistantFloor; + historyState.processedMessageHashVersion = PROCESSED_MESSAGE_HASH_VERSION; + historyState.processedMessageHashes = + safeLastProcessedAssistantFloor >= 0 + ? snapshotProcessedMessageHashes(chat, safeLastProcessedAssistantFloor) + : {}; + historyState.processedMessageHashesNeedRefresh = false; + graph.lastProcessedSeq = safeLastProcessedAssistantFloor; + return graph; +} + export function rebindProcessedHistoryStateToChat( graph, chat, diff --git a/tests/extraction-persistence-gating.mjs b/tests/extraction-persistence-gating.mjs index afbd3bc..e90c0c0 100644 --- a/tests/extraction-persistence-gating.mjs +++ b/tests/extraction-persistence-gating.mjs @@ -12,12 +12,15 @@ function createRuntime(persistResult) { nodes: [], edges: [], historyState: {}, + batchJournal: [], }; let processedHistoryUpdates = 0; + let persistedGraphSnapshot = null; return { graph, processedHistoryUpdates, + persistedGraphSnapshot, ensureCurrentGraphRuntimeState() {}, throwIfAborted() {}, getCurrentGraph() { @@ -64,6 +67,7 @@ function createRuntime(persistResult) { }; }, async persistExtractionBatchResult() { + persistedGraphSnapshot = arguments[0]?.graphSnapshot || null; return persistResult; }, finalizeBatchStatus, @@ -73,13 +77,20 @@ function createRuntime(persistResult) { updateProcessedHistorySnapshot() { processedHistoryUpdates += 1; }, - appendBatchJournal() {}, + appendBatchJournal(targetGraph, entry) { + if (!targetGraph.batchJournal) targetGraph.batchJournal = []; + targetGraph.batchJournal.push(entry); + }, createBatchJournalEntry() { - return { id: "journal-1" }; + return { id: "journal-1", processedRange: [5, 5] }; }, computePostProcessArtifacts() { return []; }, + applyProcessedHistorySnapshotToGraph(targetGraph, _chat, floor) { + targetGraph.historyState.lastProcessedAssistantFloor = floor; + targetGraph.lastProcessedSeq = floor; + }, getGraphPersistenceState() { return { chatId: "chat-test" }; }, @@ -87,6 +98,9 @@ function createRuntime(persistResult) { get processedHistoryUpdates() { return processedHistoryUpdates; }, + get persistedGraphSnapshot() { + return persistedGraphSnapshot; + }, }; } @@ -119,6 +133,14 @@ function createRuntime(persistResult) { runtime.graph.historyState.lastBatchStatus.historyAdvanceAllowed, false, ); + assert.equal( + runtime.persistedGraphSnapshot?.historyState?.lastProcessedAssistantFloor, + 5, + ); + assert.equal( + runtime.persistedGraphSnapshot?.batchJournal?.length, + 1, + ); } { @@ -150,6 +172,14 @@ function createRuntime(persistResult) { runtime.graph.historyState.lastBatchStatus.historyAdvanceAllowed, true, ); + assert.equal( + runtime.persistedGraphSnapshot?.historyState?.lastProcessedAssistantFloor, + 5, + ); + assert.equal( + runtime.persistedGraphSnapshot?.batchJournal?.length, + 1, + ); } console.log("extraction-persistence-gating tests passed"); diff --git a/tests/graph-persistence.mjs b/tests/graph-persistence.mjs index 7d0c0d5..e00fc86 100644 --- a/tests/graph-persistence.mjs +++ b/tests/graph-persistence.mjs @@ -2332,6 +2332,16 @@ result = { historyAdvanceAllowed: false, historyAdvanced: false, }; + const committedGraph = structuredClone(graph); + committedGraph.historyState.lastProcessedAssistantFloor = 1; + committedGraph.lastProcessedSeq = 1; + committedGraph.batchJournal = [ + { + id: "journal-queued-1", + processedRange: [1, 1], + createdAt: Date.now(), + }, + ]; harness.api.setCurrentGraph(graph); harness.api.setGraphPersistenceState({ loadState: "loaded", @@ -2344,6 +2354,14 @@ result = { pendingPersist: true, writesBlocked: false, }); + harness.api.writeGraphShadowSnapshot( + "chat-pending-persist-retry", + committedGraph, + { + revision: 7, + reason: "queued-persist-authoritative", + }, + ); harness.runtimeContext.__markSyncDirtyShouldThrow = true; const result = await harness.api.retryPendingGraphPersist({ @@ -2369,6 +2387,15 @@ result = { harness.api.getCurrentGraph().historyState.lastBatchStatus.persistence.outcome, "saved", ); + assert.equal( + harness.api.getCurrentGraph().batchJournal?.length, + 1, + "pending persist retry 应把 authoritative batch journal 回填到 runtime graph", + ); + assert.equal( + harness.api.getCurrentGraph().batchJournal?.[0]?.id, + "journal-queued-1", + ); } {