fix: allow extraction with recoverable pending persist

This commit is contained in:
Youzini-afk
2026-04-23 02:33:44 +08:00
parent 35d5cfd6be
commit 13ccc33f0d
4 changed files with 244 additions and 3 deletions

View File

@@ -11583,7 +11583,7 @@ function queueGraphPersist(
queuedPersistRotateIntegrity: false,
queuedPersistReason: String(reason || ""),
pendingPersist: true,
writesBlocked: true,
writesBlocked: !isRecoveryOnlyPersistTier(effectiveRecoverableTier),
lastPersistReason: String(reason || ""),
lastPersistMode: immediate ? "pending-immediate" : "pending-debounced",
lastRecoverableStorageTier: isRecoveryOnlyPersistTier(effectiveRecoverableTier)

View File

@@ -454,6 +454,28 @@ function isPersistenceRevisionAccepted(runtime, persistence = null) {
return Number.isFinite(lastAcceptedRevision) && lastAcceptedRevision >= persistenceRevision;
}
function hasRecoverablePendingPersistence(runtime) {
const persistenceState = runtime?.getGraphPersistenceState?.() || {};
if (persistenceState.pendingPersist !== true) {
return false;
}
const recoverableTier = String(
persistenceState.lastRecoverableStorageTier || "none",
).trim();
if (recoverableTier === "metadata-full") {
return true;
}
if (recoverableTier !== "shadow") {
return false;
}
const queuedRevision = Number(persistenceState.queuedPersistRevision || 0);
const shadowRevision = Number(persistenceState.shadowSnapshotRevision || 0);
if (!Number.isFinite(queuedRevision) || queuedRevision <= 0) {
return true;
}
return Number.isFinite(shadowRevision) && shadowRevision >= queuedRevision;
}
function getPendingPersistenceGateInfo(runtime) {
const graph = runtime?.getCurrentGraph?.();
const batchStatus = graph?.historyState?.lastBatchStatus || null;
@@ -479,10 +501,14 @@ function getPendingPersistenceGateInfo(runtime) {
async function maybeRetryPendingPersistence(runtime, reason = "pending-persist-retry") {
const gate = getPendingPersistenceGateInfo(runtime);
if (!gate || typeof runtime?.retryPendingGraphPersist !== "function") {
if (!gate) {
return gate;
}
if (typeof runtime?.retryPendingGraphPersist !== "function") {
return hasRecoverablePendingPersistence(runtime) ? null : gate;
}
try {
const retryResult = await runtime.retryPendingGraphPersist({ reason });
if (retryResult?.accepted === true) {
@@ -492,7 +518,11 @@ async function maybeRetryPendingPersistence(runtime, reason = "pending-persist-r
runtime?.console?.warn?.("[ST-BME] pending persistence retry failed", error);
}
return getPendingPersistenceGateInfo(runtime);
const nextGate = getPendingPersistenceGateInfo(runtime);
if (nextGate && hasRecoverablePendingPersistence(runtime)) {
return null;
}
return nextGate;
}
function formatPendingPersistenceGateMessage(runtime, operationLabel = "当前提取") {

View File

@@ -329,6 +329,135 @@ async function testManualExtractIgnoresSupersededPendingPersistence() {
assert.notEqual(context.lastExtractionStatus.text, "等待持久化确认");
}
async function testManualExtractContinuesWithRecoverablePendingPersistence() {
let executeExtractionBatchCalls = 0;
let assistantTurnCallCount = 0;
const chat = [{ is_user: true, mes: "u" }, { is_user: false, mes: "a" }];
const context = {
...createBaseStatusContext(),
isExtracting: false,
graphPersistenceState: {
pendingPersist: true,
lastAcceptedRevision: 0,
queuedPersistRevision: 7,
shadowSnapshotRevision: 7,
lastRecoverableStorageTier: "shadow",
},
currentGraph: {
historyState: {
lastBatchStatus: {
processedRange: [1, 1],
persistence: {
outcome: "queued",
accepted: false,
revision: 7,
reason: "extraction-batch-complete:pending",
storageTier: "shadow",
},
},
},
},
getCurrentChatId() {
return "chat-mobile";
},
getCurrentGraph() {
return context.currentGraph;
},
getIsExtracting() {
return context.isExtracting;
},
getGraphPersistenceState() {
return {
pendingPersist: true,
lastAcceptedRevision: 0,
queuedPersistRevision: 7,
shadowSnapshotRevision: 7,
lastRecoverableStorageTier: "shadow",
};
},
ensureGraphMutationReady() {
return true;
},
async recoverHistoryIfNeeded() {
return true;
},
normalizeGraphRuntimeState(graph) {
return graph;
},
setCurrentGraph(graph) {
context.currentGraph = graph;
},
createEmptyGraph() {
return {};
},
getContext() {
return { chat };
},
getAssistantTurns() {
assistantTurnCallCount += 1;
return assistantTurnCallCount <= 2 ? [1] : [];
},
getLastProcessedAssistantFloor() {
return 0;
},
clampInt(value, fallback) {
return Number.isFinite(Number(value)) ? Number(value) : fallback;
},
getSettings() {
return { extractEvery: 1 };
},
beginStageAbortController() {
return { signal: {} };
},
async executeExtractionBatch() {
executeExtractionBatchCalls += 1;
return {
success: true,
result: {
newNodes: 0,
updatedNodes: 0,
newEdges: 0,
},
effects: {},
batchStatus: {
persistence: {
accepted: true,
},
},
historyAdvanceAllowed: true,
};
},
async retryPendingGraphPersist() {
return {
accepted: false,
reason: "shadow-still-pending",
};
},
isAbortError() {
return false;
},
onManualExtractController,
finishStageAbortController() {},
setIsExtracting(value) {
context.isExtracting = value;
},
setLastExtractionStatus(text, meta, level) {
context.lastExtractionStatus = { text, meta, level };
context.runtimeStatus = { text, meta, level };
},
toastr: {
info() {},
success() {},
warning() {},
error() {},
},
result: null,
};
await onManualExtractController(context, { drainAll: false });
assert.equal(executeExtractionBatchCalls, 1);
assert.notEqual(context.lastExtractionStatus.text, "等待持久化确认");
}
async function testManualExtractIgnoresFailedBatchWithoutPersistenceAttempt() {
let executeExtractionBatchCalls = 0;
const chat = [{ is_user: true, mes: "u" }, { is_user: false, mes: "a" }];
@@ -567,6 +696,7 @@ testIndexDefinesLastProcessedAssistantFloorHelper();
await testVectorSyncTerminalStateUpdatesRuntime();
await testManualExtractNoBatchesDoesNotStayRunning();
await testManualExtractIgnoresSupersededPendingPersistence();
await testManualExtractContinuesWithRecoverablePendingPersistence();
await testManualExtractIgnoresFailedBatchWithoutPersistenceAttempt();
await testManualRebuildSetsTerminalRuntimeStatus();

View File

@@ -4902,6 +4902,86 @@ async function testAutoExtractionDefersWhenHistoryRecoveryBusy() {
assert.deepEqual(deferredReasons, ["history-recovering"]);
}
async function testAutoExtractionContinuesWithRecoverablePendingPersistence() {
const deferredReasons = [];
const executeCalls = [];
const currentGraph = {
historyState: {
lastBatchStatus: {
processedRange: [1, 1],
persistence: {
outcome: "queued",
accepted: false,
revision: 7,
reason: "extraction-batch-complete:pending",
storageTier: "shadow",
},
},
},
};
await runExtractionController({
console,
getIsExtracting: () => false,
getCurrentGraph: () => currentGraph,
getSettings: () => ({ enabled: true, extractEvery: 1 }),
getContext: () => ({
chat: [{ is_user: true, mes: "u" }, { is_user: false, mes: "a" }],
}),
getAssistantTurns: () => [1],
getLastProcessedAssistantFloor: () => 0,
getGraphPersistenceState: () => ({
loadState: "loaded",
pendingPersist: true,
lastAcceptedRevision: 0,
queuedPersistRevision: 7,
shadowSnapshotRevision: 7,
lastRecoverableStorageTier: "shadow",
}),
ensureGraphMutationReady: () => true,
async retryPendingGraphPersist() {
return {
accepted: false,
reason: "shadow-still-pending",
};
},
async recoverHistoryIfNeeded() {
return true;
},
deferAutoExtraction(reason) {
deferredReasons.push(reason);
},
setIsExtracting() {},
beginStageAbortController() {
return { signal: {} };
},
setLastExtractionStatus() {},
async executeExtractionBatch(options) {
executeCalls.push(options);
return {
success: true,
result: {
newNodes: 0,
updatedNodes: 0,
newEdges: 0,
},
batchStatus: {
persistence: {
accepted: true,
},
},
historyAdvanceAllowed: true,
};
},
finishStageAbortController() {},
isAbortError: () => false,
notifyExtractionIssue() {},
});
assert.equal(executeCalls.length, 1);
assert.deepEqual(deferredReasons, []);
}
async function testRemoveNodeHandlesCyclicChildGraph() {
const graph = createEmptyGraph();
const nodeA = addNode(
@@ -7415,6 +7495,7 @@ await testLagModePendingResumeKeepsLockedPreviousAssistantAfterLatestDisappears(
await testAutoExtractionDefersWhenGraphNotReady();
await testAutoExtractionDefersWhenAlreadyExtracting();
await testAutoExtractionDefersWhenHistoryRecoveryBusy();
await testAutoExtractionContinuesWithRecoverablePendingPersistence();
await testRemoveNodeHandlesCyclicChildGraph();
await testGenerationRecallAppliesFinalInjectionOncePerTransaction();
await testHistoryGenerationReusesPersistedRecallForStableUserFloor();