From 2823d181677e600722720bebd791ff58b7654f16 Mon Sep 17 00:00:00 2001 From: opencode Date: Fri, 15 May 2026 19:44:08 +0000 Subject: [PATCH] fix(persistence): clear stale pending confirmations --- index.js | 101 ++++++++++++++++++ maintenance/extraction-controller.js | 22 +++- tests/graph-persistence.mjs | 132 +++++++++++++++++++++++ tests/p0-regressions.mjs | 153 +++++++++++++++++++++++++++ 4 files changed, 407 insertions(+), 1 deletion(-) diff --git a/index.js b/index.js index 6f17a0b..fb6a4be 100644 --- a/index.js +++ b/index.js @@ -15038,6 +15038,91 @@ function applyAcceptedPendingPersistState( refreshPanelLiveState(); } +function maybeClearAcceptedPendingPersistState( + source = "accepted-pending-persist-reconcile", +) { + ensureCurrentGraphRuntimeState(); + if (graphPersistenceState.pendingPersist !== true) { + return false; + } + + const batchStatus = currentGraph?.historyState?.lastBatchStatus || null; + const persistence = batchStatus?.persistence || null; + const persistenceRevision = Number(persistence?.revision || 0); + const queuedRevision = Number(graphPersistenceState.queuedPersistRevision || 0); + const targetRevision = Math.max( + Number.isFinite(persistenceRevision) ? persistenceRevision : 0, + Number.isFinite(queuedRevision) ? queuedRevision : 0, + ); + if (!Number.isFinite(targetRevision) || targetRevision <= 0) { + return false; + } + + const commitMarker = syncCommitMarkerToPersistenceState(getContext()); + const context = getContext(); + const activeChatId = normalizeChatIdCandidate(getCurrentChatId(context)); + const queuedChatId = normalizeChatIdCandidate( + graphPersistenceState.queuedPersistChatId || + graphPersistenceState.chatId || + activeChatId, + ); + const currentIdentity = resolveCurrentChatIdentity(context); + if ( + !activeChatId || + !queuedChatId || + (!areChatIdsEquivalentForResolvedIdentity( + queuedChatId, + activeChatId, + currentIdentity, + ) && + !areChatIdsEquivalentForResolvedIdentity( + activeChatId, + queuedChatId, + currentIdentity, + )) + ) { + return false; + } + const markerChatId = normalizeChatIdCandidate(commitMarker?.chatId); + const markerAcceptedRevision = getAcceptedCommitMarkerRevision(commitMarker); + const markerAcceptedForQueuedChat = + markerAcceptedRevision > 0 && + markerChatId && + (areChatIdsEquivalentForResolvedIdentity(markerChatId, queuedChatId, currentIdentity) || + areChatIdsEquivalentForResolvedIdentity( + queuedChatId, + markerChatId, + currentIdentity, + )); + const acceptedRevision = Math.max( + Number(graphPersistenceState.lastAcceptedRevision || 0), + markerAcceptedForQueuedChat ? Number(markerAcceptedRevision || 0) : 0, + ); + if (!Number.isFinite(acceptedRevision) || acceptedRevision < targetRevision) { + return false; + } + + const acceptedStorageTier = + String(graphPersistenceState.acceptedStorageTier || "").trim() || + String(commitMarker?.storageTier || "").trim() || + String(persistence?.storageTier || "").trim() || + "none"; + const acceptedResult = buildGraphPersistResult({ + saved: true, + accepted: true, + reason: `${String(source || "accepted-pending-persist-reconcile")}:accepted-revision`, + revision: targetRevision, + saveMode: "accepted-revision-reconcile", + storageTier: acceptedStorageTier, + acceptedBy: acceptedStorageTier, + }); + applyAcceptedPendingPersistState(acceptedResult, { + lastProcessedAssistantFloor: resolvePendingPersistLastProcessedAssistantFloor(), + }); + clearPendingGraphPersistRetry(); + return true; +} + function schedulePendingGraphPersistRetry( reason = "pending-graph-persist-retry", attempt = 0, @@ -15314,6 +15399,22 @@ async function retryPendingGraphPersist({ }); } + if (maybeClearAcceptedPendingPersistState(reason)) { + return buildGraphPersistResult({ + saved: true, + blocked: false, + accepted: true, + reason: `${String(reason || "pending-graph-persist-retry")}:accepted-revision`, + revision: Math.max( + Number(graphPersistenceState.lastAcceptedRevision || 0), + Number(graphPersistenceState.revision || 0), + ), + saveMode: "accepted-revision-reconcile", + storageTier: String(graphPersistenceState.acceptedStorageTier || "none"), + acceptedBy: String(graphPersistenceState.acceptedStorageTier || "none"), + }); + } + const context = getContext(); const activeChatId = normalizeChatIdCandidate(getCurrentChatId(context)); const queuedChatId = normalizeChatIdCandidate( diff --git a/maintenance/extraction-controller.js b/maintenance/extraction-controller.js index 20d3b36..ed96aff 100644 --- a/maintenance/extraction-controller.js +++ b/maintenance/extraction-controller.js @@ -545,6 +545,21 @@ function isPersistenceRevisionAccepted(runtime, persistence = null) { return Number.isFinite(lastAcceptedRevision) && lastAcceptedRevision >= persistenceRevision; } +function isPendingPersistenceRevisionAccepted(runtime, persistence = null) { + const persistenceState = runtime?.getGraphPersistenceState?.() || {}; + const persistenceRevision = Number(persistence?.revision || 0); + const queuedRevision = Number(persistenceState.queuedPersistRevision || 0); + const targetRevision = Math.max( + Number.isFinite(persistenceRevision) ? persistenceRevision : 0, + Number.isFinite(queuedRevision) ? queuedRevision : 0, + ); + if (!Number.isFinite(targetRevision) || targetRevision <= 0) { + return false; + } + const lastAcceptedRevision = Number(persistenceState.lastAcceptedRevision || 0); + return Number.isFinite(lastAcceptedRevision) && lastAcceptedRevision >= targetRevision; +} + function hasRecoverablePendingPersistence(runtime) { const persistenceState = runtime?.getGraphPersistenceState?.() || {}; if (persistenceState.pendingPersist !== true) { @@ -572,8 +587,13 @@ function getPendingPersistenceGateInfo(runtime) { const batchStatus = graph?.historyState?.lastBatchStatus || null; const persistence = batchStatus?.persistence || null; const pendingPersist = runtime?.getGraphPersistenceState?.()?.pendingPersist === true; - const accepted = isPersistenceRevisionAccepted(runtime, persistence); + const accepted = pendingPersist + ? isPendingPersistenceRevisionAccepted(runtime, persistence) + : isPersistenceRevisionAccepted(runtime, persistence); const attempted = hasMeaningfulPersistenceRecord(persistence); + if (pendingPersist && attempted && accepted) { + return null; + } if (!pendingPersist && (!attempted || accepted)) { return null; } diff --git a/tests/graph-persistence.mjs b/tests/graph-persistence.mjs index 855f60f..bf53832 100644 --- a/tests/graph-persistence.mjs +++ b/tests/graph-persistence.mjs @@ -4029,6 +4029,138 @@ result = { ); } +{ + const harness = await createGraphPersistenceHarness({ + chatId: "chat-pending-persist-already-accepted", + globalChatId: "chat-pending-persist-already-accepted", + chatMetadata: { + integrity: "meta-pending-persist-already-accepted", + }, + chat: [ + { is_user: true, mes: "用户发言" }, + { is_user: false, mes: "助手回复" }, + ], + }); + const graph = createMeaningfulGraph( + "chat-pending-persist-already-accepted", + "pending-persist-already-accepted", + ); + graph.historyState.lastProcessedAssistantFloor = 1; + graph.lastProcessedSeq = 1; + graph.historyState.lastBatchStatus = { + processedRange: [1, 1], + completed: true, + persistence: { + outcome: "queued", + accepted: false, + storageTier: "authority-sql", + reason: "extraction-batch-complete:pending", + revision: 7, + saveMode: "immediate", + saved: false, + queued: true, + blocked: true, + }, + historyAdvanceAllowed: false, + historyAdvanced: false, + }; + harness.api.setCurrentGraph(graph); + harness.api.setGraphPersistenceState({ + loadState: "loaded", + chatId: "chat-pending-persist-already-accepted", + revision: 7, + lastPersistedRevision: 7, + lastAcceptedRevision: 7, + acceptedStorageTier: "authority-sql", + queuedPersistRevision: 7, + queuedPersistChatId: "chat-pending-persist-already-accepted", + queuedPersistMode: "immediate", + pendingPersist: true, + writesBlocked: false, + }); + harness.runtimeContext.__markSyncDirtyShouldThrow = true; + + const result = await harness.api.retryPendingGraphPersist({ + reason: "queued-persist-already-accepted-test", + }); + + assert.equal(result.accepted, true); + assert.equal( + harness.api.getGraphPersistenceState().pendingPersist, + false, + "已被 lastAcceptedRevision 覆盖的 pendingPersist 应在重试时直接清除", + ); + assert.equal( + harness.api.getCurrentGraph().historyState.lastBatchStatus.persistence.accepted, + true, + ); + assert.equal( + harness.api.getCurrentGraph().historyState.lastBatchStatus.historyAdvanceAllowed, + true, + ); +} + +{ + const harness = await createGraphPersistenceHarness({ + chatId: "chat-pending-current", + globalChatId: "chat-pending-current", + chatMetadata: { + integrity: "meta-pending-current", + }, + chat: [ + { is_user: true, mes: "当前聊天用户发言" }, + { is_user: false, mes: "当前聊天助手回复" }, + ], + }); + const graph = createMeaningfulGraph( + "chat-pending-current", + "pending-persist-chat-mismatch", + ); + graph.historyState.lastBatchStatus = { + processedRange: [1, 1], + completed: true, + persistence: { + outcome: "queued", + accepted: false, + storageTier: "authority-sql", + reason: "extraction-batch-complete:pending", + revision: 7, + saveMode: "immediate", + saved: false, + queued: true, + blocked: true, + }, + historyAdvanceAllowed: false, + historyAdvanced: false, + }; + harness.api.setCurrentGraph(graph); + harness.api.setGraphPersistenceState({ + loadState: "loaded", + chatId: "chat-pending-current", + revision: 9, + lastPersistedRevision: 9, + lastAcceptedRevision: 9, + acceptedStorageTier: "authority-sql", + queuedPersistRevision: 7, + queuedPersistChatId: "other-chat-pending", + queuedPersistMode: "immediate", + pendingPersist: true, + writesBlocked: false, + }); + + const result = await harness.api.retryPendingGraphPersist({ + reason: "queued-persist-chat-mismatch-test", + }); + + assert.equal(result.accepted, false); + assert.equal(result.reason, "queued-chat-mismatch"); + assert.equal( + harness.api.getGraphPersistenceState().pendingPersist, + true, + "其它聊天的 queued pending 不能被当前聊天 accepted revision 清掉", + ); +} + { const chatId = "meta-authority-indexeddb-migration"; const legacyGraph = stampPersistedGraph( diff --git a/tests/p0-regressions.mjs b/tests/p0-regressions.mjs index b9cc331..56ed305 100644 --- a/tests/p0-regressions.mjs +++ b/tests/p0-regressions.mjs @@ -5570,6 +5570,157 @@ async function testAutoExtractionContinuesWithRecoverablePendingPersistence() { assert.deepEqual(deferredReasons, []); } +async function testAutoExtractionIgnoresAcceptedPendingPersistenceFlag() { + const deferredReasons = []; + const executeCalls = []; + const currentGraph = { + historyState: { + lastBatchStatus: { + processedRange: [1, 1], + persistence: { + outcome: "queued", + accepted: false, + revision: 7, + reason: "extraction-batch-complete:pending", + storageTier: "authority-sql", + }, + }, + }, + }; + + await runExtractionController({ + console, + getIsExtracting: () => false, + getCurrentGraph: () => currentGraph, + getSettings: () => ({ enabled: true, extractEvery: 1 }), + getContext: () => ({ + chat: [ + { is_user: true, mes: "u" }, + { is_user: false, mes: "a" }, + ], + }), + getAssistantTurns: () => [1], + getLastProcessedAssistantFloor: () => 0, + getGraphPersistenceState: () => ({ + loadState: "loaded", + pendingPersist: true, + lastAcceptedRevision: 7, + queuedPersistRevision: 7, + lastRecoverableStorageTier: "none", + acceptedStorageTier: "authority-sql", + }), + ensureGraphMutationReady: () => true, + async retryPendingGraphPersist() { + throw new Error("accepted pending flag should not require retry"); + }, + async recoverHistoryIfNeeded() { + return true; + }, + deferAutoExtraction(reason) { + deferredReasons.push(reason); + }, + setIsExtracting() {}, + beginStageAbortController() { + return { signal: {} }; + }, + setLastExtractionStatus() {}, + async executeExtractionBatch(options) { + executeCalls.push(options); + return { + success: true, + result: { + newNodes: 0, + updatedNodes: 0, + newEdges: 0, + }, + batchStatus: { + persistence: { + accepted: true, + }, + }, + historyAdvanceAllowed: true, + }; + }, + finishStageAbortController() {}, + isAbortError: () => false, + notifyExtractionIssue() {}, + }); + + assert.equal(executeCalls.length, 1); + assert.deepEqual(deferredReasons, []); +} + +async function testAutoExtractionBlocksWhenQueuedRevisionStillPending() { + const deferredReasons = []; + const executeCalls = []; + const statusUpdates = []; + const currentGraph = { + historyState: { + lastBatchStatus: { + processedRange: [1, 1], + persistence: { + outcome: "queued", + accepted: false, + revision: 7, + reason: "extraction-batch-complete:pending", + storageTier: "authority-sql", + }, + }, + }, + }; + + await runExtractionController({ + console, + getIsExtracting: () => false, + getCurrentGraph: () => currentGraph, + getSettings: () => ({ enabled: true, extractEvery: 1 }), + getContext: () => ({ + chat: [ + { is_user: true, mes: "u" }, + { is_user: false, mes: "a" }, + ], + }), + getAssistantTurns: () => [1], + getLastProcessedAssistantFloor: () => 0, + getGraphPersistenceState: () => ({ + loadState: "loaded", + pendingPersist: true, + lastAcceptedRevision: 7, + queuedPersistRevision: 8, + lastRecoverableStorageTier: "none", + acceptedStorageTier: "authority-sql", + }), + ensureGraphMutationReady: () => true, + async retryPendingGraphPersist() { + return { + accepted: false, + reason: "queued-revision-still-pending", + }; + }, + async recoverHistoryIfNeeded() { + return true; + }, + deferAutoExtraction(reason) { + deferredReasons.push(reason); + }, + setIsExtracting() {}, + setLastExtractionStatus(label, text, level) { + statusUpdates.push({ label, text, level }); + }, + async executeExtractionBatch(options) { + executeCalls.push(options); + return { success: true }; + }, + finishStageAbortController() {}, + isAbortError: () => false, + notifyExtractionIssue() {}, + }); + + assert.equal(executeCalls.length, 0); + assert.deepEqual(deferredReasons, ["pending-persist"]); + assert.equal(statusUpdates.at(-1)?.label, "等待持久化确认"); +} + async function testRemoveNodeHandlesCyclicChildGraph() { const graph = createEmptyGraph(); const nodeA = addNode( @@ -8141,6 +8292,8 @@ await testAutoExtractionDefersWhenGraphNotReady(); await testAutoExtractionDefersWhenAlreadyExtracting(); await testAutoExtractionDefersWhenHistoryRecoveryBusy(); await testAutoExtractionContinuesWithRecoverablePendingPersistence(); +await testAutoExtractionIgnoresAcceptedPendingPersistenceFlag(); +await testAutoExtractionBlocksWhenQueuedRevisionStillPending(); await testRemoveNodeHandlesCyclicChildGraph(); await testGenerationRecallAppliesFinalInjectionOncePerTransaction(); await testHistoryGenerationReusesPersistedRecallForStableUserFloor();