Refactor extraction persistence into two-stage status model

This commit is contained in:
Youzini-afk
2026-04-10 01:19:49 +08:00
parent 8f0db97c78
commit 7faa9cfc7f
10 changed files with 1239 additions and 71 deletions

View File

@@ -54,6 +54,68 @@ function normalizeSmartTriggerDecision(decision = null) {
};
}
function normalizePersistenceStateRecord(persistResult = null) {
const accepted = persistResult?.accepted === true;
const queued = persistResult?.queued === true;
const blocked = persistResult?.blocked === true;
let outcome = "failed";
if (accepted && String(persistResult?.storageTier || "") === "indexeddb") {
outcome = "saved";
} else if (accepted) {
outcome = "fallback";
} else if (queued) {
outcome = "queued";
} else if (blocked) {
outcome = "blocked";
}
return {
outcome,
accepted,
storageTier: String(persistResult?.storageTier || "none"),
reason: String(persistResult?.reason || ""),
revision: Number.isFinite(Number(persistResult?.revision))
? Number(persistResult.revision)
: 0,
saveMode: String(persistResult?.saveMode || ""),
saved: persistResult?.saved === true,
queued,
blocked,
};
}
function getPendingPersistenceGateInfo(runtime) {
const graph = runtime?.getCurrentGraph?.();
const batchStatus = graph?.historyState?.lastBatchStatus || null;
const persistence = batchStatus?.persistence || null;
const pendingPersist = runtime?.getGraphPersistenceState?.()?.pendingPersist === true;
const accepted = persistence?.accepted === true;
if (!pendingPersist && (!persistence || accepted)) {
return null;
}
return {
pendingPersist,
accepted,
outcome: String(persistence?.outcome || ""),
reason: String(persistence?.reason || ""),
revision: Number.isFinite(Number(persistence?.revision))
? Number(persistence.revision)
: 0,
};
}
function formatPendingPersistenceGateMessage(runtime, operationLabel = "当前提取") {
const gate = getPendingPersistenceGateInfo(runtime);
if (!gate) return "";
const reason = gate.reason ? ` · ${gate.reason}` : "";
const revision =
Number.isFinite(Number(gate.revision)) && Number(gate.revision) > 0
? ` · rev ${Number(gate.revision)}`
: "";
return `${operationLabel}已暂停:上一批持久化尚未确认,请优先重试持久化或触发恢复${revision}${reason}`;
}
export function resolveAutoExtractionPlanController(
runtime,
{
@@ -255,24 +317,13 @@ export async function executeExtractionBatchController(
batchStatus,
);
const batchStatusRef = effects?.batchStatus || batchStatus;
const persistResult = runtime.saveGraphToChat({
const persistResult = await runtime.persistExtractionBatchResult({
reason: "extraction-batch-complete",
persistMetadata: true,
captureShadow: true,
immediate: true,
lastProcessedAssistantFloor: endIdx,
});
const persistAccepted = Boolean(
persistResult?.saved || persistResult?.queued,
);
if (!persistAccepted) {
runtime.setBatchStageOutcome(
batchStatusRef,
"finalize",
"failed",
`图谱持久化失败: ${persistResult?.reason || "unknown-persist-failure"}`,
);
}
const persistence = normalizePersistenceStateRecord(persistResult);
batchStatusRef.persistence = persistence;
batchStatusRef.historyAdvanceAllowed = persistence.accepted === true;
const finalizedBatchStatus = runtime.finalizeBatchStatus(
batchStatusRef,
runtime.getExtractionCount(),
@@ -280,33 +331,26 @@ export async function executeExtractionBatchController(
runtime.getCurrentGraph().historyState.lastBatchStatus = {
...finalizedBatchStatus,
historyAdvanced: runtime.shouldAdvanceProcessedHistory(finalizedBatchStatus),
persist: persistResult
? {
saved: Boolean(persistResult.saved),
queued: Boolean(persistResult.queued),
blocked: Boolean(persistResult.blocked),
reason: String(persistResult.reason || ""),
saveMode: String(persistResult.saveMode || ""),
revision: Number.isFinite(Number(persistResult.revision))
? Number(persistResult.revision)
: 0,
}
: null,
persistence,
historyAdvanceAllowed: persistence.accepted === true,
historyAdvanced: runtime.shouldAdvanceProcessedHistory({
...finalizedBatchStatus,
historyAdvanceAllowed: persistence.accepted === true,
}),
};
if (runtime.getCurrentGraph().historyState.lastBatchStatus.historyAdvanced) {
runtime.updateProcessedHistorySnapshot(chat, endIdx);
} else if (!persistAccepted) {
} else if (!persistence.accepted) {
runtime.setLastExtractionStatus(
"提取待恢复",
`楼层 ${startIdx}-${endIdx} 已抽取但未确认写盘成功,请稍后重试或检查持久化状态`,
`楼层 ${startIdx}-${endIdx} 已抽取,但持久化状态为 ${persistence.outcome || "failed"}${persistence.reason ? ` · ${persistence.reason}` : ""}`,
"warning",
{ syncRuntime: true },
);
runtime.console?.warn?.("[ST-BME] extraction persist not accepted", {
chatId: runtime.getGraphPersistenceState?.()?.chatId || "",
persist: persistResult,
persistence,
processedRange: [startIdx, endIdx],
});
}
@@ -330,8 +374,13 @@ export async function executeExtractionBatchController(
return {
success: finalizedBatchStatus.completed,
result,
effects,
effects: {
...(effects || {}),
persistResult,
},
batchStatus: finalizedBatchStatus,
persistResult,
historyAdvanceAllowed: persistence.accepted === true,
error: finalizedBatchStatus.completed
? ""
: effects?.vectorError ||
@@ -381,6 +430,27 @@ export async function runExtractionController(runtime, options = {}) {
return;
}
const pendingPersistMessage = formatPendingPersistenceGateMessage(
runtime,
"自动提取",
);
if (pendingPersistMessage) {
runtime.console?.debug?.("[ST-BME] auto extraction paused: pending persistence", {
persistence: runtime.getCurrentGraph?.()?.historyState?.lastBatchStatus?.persistence || null,
});
runtime.deferAutoExtraction?.("pending-persist", {
targetEndFloor: deferredTargetEndFloor,
strategy: plan.strategy,
});
runtime.setLastExtractionStatus(
"等待持久化确认",
pendingPersistMessage,
"warning",
{ syncRuntime: true },
);
return;
}
if (!runtime.getCurrentGraph()) {
runtime.ensureCurrentGraphRuntimeState?.();
}
@@ -436,12 +506,22 @@ export async function runExtractionController(runtime, options = {}) {
return;
}
runtime.setLastExtractionStatus(
"提取完成",
`楼层 ${startIdx}-${endIdx} · 新建 ${batchResult.result?.newNodes || 0} · 更新 ${batchResult.result?.updatedNodes || 0} · 新边 ${batchResult.result?.newEdges || 0}`,
"success",
{ syncRuntime: true },
);
const persistence = batchResult.batchStatus?.persistence || null;
if (batchResult.historyAdvanceAllowed === false) {
runtime.setLastExtractionStatus(
"提取完成,持久化待确认",
`楼层 ${startIdx}-${endIdx} · 新建 ${batchResult.result?.newNodes || 0} · 更新 ${batchResult.result?.updatedNodes || 0} · 新边 ${batchResult.result?.newEdges || 0}${persistence?.reason ? ` · ${persistence.reason}` : ""}`,
"warning",
{ syncRuntime: true },
);
} else {
runtime.setLastExtractionStatus(
"提取完成",
`楼层 ${startIdx}-${endIdx} · 新建 ${batchResult.result?.newNodes || 0} · 更新 ${batchResult.result?.updatedNodes || 0} · 新边 ${batchResult.result?.newEdges || 0}`,
"success",
{ syncRuntime: true },
);
}
} catch (e) {
if (runtime.isAbortError(e)) {
runtime.setLastExtractionStatus(
@@ -468,6 +548,22 @@ export async function onManualExtractController(runtime, options = {}) {
return;
}
if (!runtime.ensureGraphMutationReady("手动提取")) return;
const pendingPersistMessage = formatPendingPersistenceGateMessage(
runtime,
"手动提取",
);
if (pendingPersistMessage) {
runtime.setLastExtractionStatus(
"等待持久化确认",
pendingPersistMessage,
"warning",
{
syncRuntime: true,
},
);
runtime.toastr.warning("上一批持久化尚未确认,请先重试持久化或执行恢复");
return;
}
if (!(await runtime.recoverHistoryIfNeeded("manual-extract"))) return;
if (!runtime.getCurrentGraph()) {
runtime.setCurrentGraph(
@@ -548,6 +644,14 @@ export async function onManualExtractController(runtime, options = {}) {
warnings.push(...batchResult.effects.warnings);
}
if (batchResult.historyAdvanceAllowed === false) {
warnings.push(
batchResult.batchStatus?.persistence?.reason ||
"当前批次持久化尚未确认",
);
break;
}
if (options?.drainAll === false) {
break;
}
@@ -566,19 +670,36 @@ export async function onManualExtractController(runtime, options = {}) {
return;
}
runtime.toastr.success(
`提取完成:${totals.batches} 批,新建 ${totals.newNodes},更新 ${totals.updatedNodes},新边 ${totals.newEdges}`,
);
runtime.setLastExtractionStatus(
"手动提取完成",
`${totals.batches} 批 · 新建 ${totals.newNodes} · 更新 ${totals.updatedNodes} · 新边 ${totals.newEdges}`,
"success",
{
syncRuntime: true,
toastKind: "success",
toastTitle: "ST-BME 手动提取",
},
);
const pendingAfterRun = getPendingPersistenceGateInfo(runtime);
if (pendingAfterRun) {
runtime.toastr.warning(
`提取完成但持久化待确认:${pendingAfterRun.reason || pendingAfterRun.outcome || "unknown"}`,
);
runtime.setLastExtractionStatus(
"手动提取完成,持久化待确认",
`${totals.batches} 批 · 新建 ${totals.newNodes} · 更新 ${totals.updatedNodes} · 新边 ${totals.newEdges}${pendingAfterRun.reason ? ` · ${pendingAfterRun.reason}` : ""}`,
"warning",
{
syncRuntime: true,
toastKind: "",
toastTitle: "ST-BME 手动提取",
},
);
} else {
runtime.toastr.success(
`提取完成:${totals.batches} 批,新建 ${totals.newNodes},更新 ${totals.updatedNodes},新边 ${totals.newEdges}`,
);
runtime.setLastExtractionStatus(
"手动提取完成",
`${totals.batches} 批 · 新建 ${totals.newNodes} · 更新 ${totals.updatedNodes} · 新边 ${totals.newEdges}`,
"success",
{
syncRuntime: true,
toastKind: "success",
toastTitle: "ST-BME 手动提取",
},
);
}
if (warnings.length > 0) {
runtime.toastr.warning(warnings.slice(0, 2).join(""), "ST-BME 提取警告", {
timeOut: 5000,