From b7c077ed2134762b74fd18c5adb28e5ed720730b Mon Sep 17 00:00:00 2001 From: Youzini-afk <13153778771cx@gmail.com> Date: Wed, 29 Apr 2026 15:39:54 +0800 Subject: [PATCH] Implement background maintenance post-process queue --- index.js | 722 +++++++++++++++++++++++---- maintenance/extraction-controller.js | 30 ++ tests/p0-regressions.mjs | 202 ++++++++ ui/panel.html | 4 + ui/panel.js | 38 ++ ui/ui-status.js | 10 + 6 files changed, 911 insertions(+), 95 deletions(-) diff --git a/index.js b/index.js index ced6311..58c56c0 100644 --- a/index.js +++ b/index.js @@ -136,6 +136,10 @@ import { rollupSummaryFrontier, runHierarchicalSummaryPostProcess, } from "./maintenance/hierarchical-summary.js"; +import { + createDefaultSummaryState, + normalizeGraphSummaryState, +} from "./graph/summary-state.js"; import { appendLukerGraphJournalEntryV2, buildGraphCommitMarker, @@ -197,6 +201,8 @@ import { unhideAll, } from "./ui/hide-engine.js"; import { + addEdge, + addNode, createEmptyGraph, deserializeGraph, exportGraph, @@ -16200,6 +16206,348 @@ function shouldDeferExtractionVectorSync(settings = {}) { return resolveMaintenancePostProcessConcurrency(settings).mode !== "strict"; } +function shouldDeferExtractionMaintenance(settings = {}) { + return resolveMaintenancePostProcessConcurrency(settings).mode !== "strict"; +} + +function clonePlanCommitValue(value, fallback = null) { + try { + if (typeof cloneRuntimeDebugValue === "function") { + return cloneRuntimeDebugValue(value, fallback); + } + } catch { + } + try { + return JSON.parse(JSON.stringify(value ?? fallback)); + } catch { + return fallback; + } +} + +function arePlanCommitValuesEqual(left, right) { + try { + return JSON.stringify(left ?? null) === JSON.stringify(right ?? null); + } catch { + return false; + } +} + +function normalizeSummaryStateForPlan(state = {}) { + try { + if (typeof createDefaultSummaryState === "function") { + return createDefaultSummaryState(state); + } + } catch { + } + const source = + state && typeof state === "object" && !Array.isArray(state) ? state : {}; + return { + version: Number(source.version || 1) || 1, + enabled: source.enabled !== false, + entries: Array.isArray(source.entries) + ? clonePlanCommitValue(source.entries, []) + : [], + activeEntryIds: Array.isArray(source.activeEntryIds) + ? [...new Set(source.activeEntryIds.map((id) => String(id || "").trim()).filter(Boolean))] + : [], + lastSummarizedExtractionCount: Number.isFinite( + Number(source.lastSummarizedExtractionCount), + ) + ? Math.max(0, Number(source.lastSummarizedExtractionCount)) + : 0, + lastSummarizedAssistantFloor: Number.isFinite( + Number(source.lastSummarizedAssistantFloor), + ) + ? Number(source.lastSummarizedAssistantFloor) + : -1, + }; +} + +function normalizeGraphSummaryStateForPlan(graph) { + if (!graph || typeof graph !== "object") return graph; + try { + if (typeof normalizeGraphSummaryState === "function") { + return normalizeGraphSummaryState(graph); + } + } catch { + } + graph.summaryState = normalizeSummaryStateForPlan(graph.summaryState); + return graph; +} + +function commitPlannedSummaryState(targetGraph, beforeState = {}, draftState = {}) { + if (!targetGraph || typeof targetGraph !== "object") { + return { + summaryEntriesAdded: 0, + summaryEntriesUpdated: 0, + summaryEntriesFolded: 0, + }; + } + normalizeGraphSummaryStateForPlan(targetGraph); + const before = normalizeSummaryStateForPlan(beforeState); + const draft = normalizeSummaryStateForPlan(draftState); + const target = normalizeSummaryStateForPlan(targetGraph.summaryState); + const beforeMap = new Map(before.entries.map((entry) => [entry.id, entry])); + const targetMap = new Map(target.entries.map((entry) => [entry.id, entry])); + const activeIds = new Set(target.activeEntryIds || []); + let summaryEntriesAdded = 0; + let summaryEntriesUpdated = 0; + let summaryEntriesFolded = 0; + + for (const draftEntry of draft.entries) { + const entryId = String(draftEntry?.id || "").trim(); + if (!entryId) continue; + const beforeEntry = beforeMap.get(entryId) || null; + if (beforeEntry && arePlanCommitValuesEqual(beforeEntry, draftEntry)) { + continue; + } + const clonedEntry = clonePlanCommitValue(draftEntry, draftEntry); + const targetEntry = targetMap.get(entryId) || null; + if (targetEntry) { + Object.assign(targetEntry, clonedEntry); + summaryEntriesUpdated += 1; + } else { + target.entries.push(clonedEntry); + targetMap.set(entryId, clonedEntry); + summaryEntriesAdded += 1; + } + if (String(clonedEntry.status || "active") === "folded") { + activeIds.delete(entryId); + if (beforeEntry && String(beforeEntry.status || "active") !== "folded") { + summaryEntriesFolded += 1; + } + } else { + activeIds.add(entryId); + } + } + + target.lastSummarizedExtractionCount = Math.max( + Number(target.lastSummarizedExtractionCount || 0), + Number(draft.lastSummarizedExtractionCount || 0), + ); + target.lastSummarizedAssistantFloor = Math.max( + Number(target.lastSummarizedAssistantFloor ?? -1), + Number(draft.lastSummarizedAssistantFloor ?? -1), + ); + target.activeEntryIds = [...activeIds].filter( + (entryId) => String(targetMap.get(entryId)?.status || "active") !== "folded", + ); + targetGraph.summaryState = target; + normalizeGraphSummaryStateForPlan(targetGraph); + return { + summaryEntriesAdded, + summaryEntriesUpdated, + summaryEntriesFolded, + }; +} + +function commitPlannedGraphChanges({ + targetGraph = currentGraph, + beforeSnapshot = null, + draftGraph = null, + includeSummaryState = true, +} = {}) { + const stats = { + nodesAdded: 0, + nodesUpdated: 0, + edgesAdded: 0, + summaryEntriesAdded: 0, + summaryEntriesUpdated: 0, + summaryEntriesFolded: 0, + }; + if (!targetGraph || !beforeSnapshot || !draftGraph) return stats; + targetGraph.nodes ||= []; + targetGraph.edges ||= []; + const beforeNodes = new Map( + (beforeSnapshot.nodes || []).map((node) => [String(node?.id || ""), node]), + ); + const targetNodes = new Map( + (targetGraph.nodes || []).map((node) => [String(node?.id || ""), node]), + ); + + for (const draftNode of draftGraph.nodes || []) { + const nodeId = String(draftNode?.id || "").trim(); + if (!nodeId) continue; + const beforeNode = beforeNodes.get(nodeId) || null; + if (beforeNode && arePlanCommitValuesEqual(beforeNode, draftNode)) continue; + const clonedNode = clonePlanCommitValue(draftNode, draftNode); + const targetNode = targetNodes.get(nodeId) || null; + if (!targetNode) { + if (typeof addNode === "function") { + addNode(targetGraph, clonedNode); + } else { + targetGraph.nodes.push(clonedNode); + } + targetNodes.set(nodeId, clonedNode); + stats.nodesAdded += 1; + } else { + if (typeof updateNode === "function") { + updateNode(targetGraph, nodeId, clonePlanCommitValue(clonedNode, clonedNode)); + } else { + Object.assign(targetNode, clonedNode); + } + stats.nodesUpdated += 1; + } + } + + const beforeEdgeIds = new Set( + (beforeSnapshot.edges || []).map((edge) => String(edge?.id || "").trim()), + ); + const targetEdgeIds = new Set( + (targetGraph.edges || []).map((edge) => String(edge?.id || "").trim()), + ); + for (const draftEdge of draftGraph.edges || []) { + const edgeId = String(draftEdge?.id || "").trim(); + if (!edgeId || beforeEdgeIds.has(edgeId) || targetEdgeIds.has(edgeId)) continue; + const clonedEdge = clonePlanCommitValue(draftEdge, draftEdge); + if (typeof addEdge === "function") { + addEdge(targetGraph, clonedEdge); + } else { + targetGraph.edges.push(clonedEdge); + } + targetEdgeIds.add(edgeId); + stats.edgesAdded += 1; + } + + if (includeSummaryState) { + Object.assign( + stats, + commitPlannedSummaryState( + targetGraph, + beforeSnapshot.summaryState, + draftGraph.summaryState, + ), + ); + } + return stats; +} + +function getSummaryPostProcessRunner() { + if (typeof runHierarchicalSummaryPostProcess === "function") { + return runHierarchicalSummaryPostProcess; + } + if (typeof generateSynopsis === "function") { + return async (params = {}) => { + await generateSynopsis({ + graph: params.graph, + schema: typeof getSchema === "function" ? getSchema() : [], + currentSeq: params.currentAssistantFloor, + settings: params.settings, + signal: params.signal, + }); + return { + created: true, + smallSummary: { created: true, reason: "" }, + rollup: null, + }; + }; + } + return async () => ({ + created: false, + smallSummary: { + created: false, + reason: "层级总结运行器不可用,已跳过", + }, + rollup: null, + }); +} + +function getSummaryStageLabel() { + if (typeof runHierarchicalSummaryPostProcess === "function") return "层级总结"; + if (typeof generateSynopsis === "function") return "旧式全局概要生成"; + return "层级总结"; +} + +async function runSummaryPostProcessPlanCommit(params = {}) { + const runner = getSummaryPostProcessRunner(); + const settings = params.settings || {}; + if (resolveMaintenancePostProcessConcurrency(settings).mode === "strict") { + return await runner(params); + } + const beforeSnapshot = clonePlanCommitValue(params.graph, params.graph); + const draftGraph = clonePlanCommitValue(params.graph, params.graph); + const result = await runner({ + ...params, + graph: draftGraph, + }); + const planCommit = commitPlannedGraphChanges({ + targetGraph: params.graph, + beforeSnapshot, + draftGraph, + }); + return { + ...(result && typeof result === "object" && !Array.isArray(result) + ? result + : { created: Boolean(result) }), + planCommit, + }; +} + +async function runReflectionPostProcessPlanCommit(params = {}) { + const settings = params.settings || {}; + if (resolveMaintenancePostProcessConcurrency(settings).mode === "strict") { + const reflectionId = await generateReflection(params); + return { reflectionId, planCommit: null }; + } + const beforeSnapshot = clonePlanCommitValue(params.graph, params.graph); + const draftGraph = clonePlanCommitValue(params.graph, params.graph); + const reflectionId = await generateReflection({ + ...params, + graph: draftGraph, + }); + const planCommit = commitPlannedGraphChanges({ + targetGraph: params.graph, + beforeSnapshot, + draftGraph, + }); + return { reflectionId, planCommit }; +} + +async function runCompressionPostProcessPlanCommit({ + graph, + schema = [], + embeddingConfig = null, + force = false, + customPrompt = undefined, + signal = undefined, + settings = {}, +} = {}) { + if (resolveMaintenancePostProcessConcurrency(settings).mode === "strict") { + return await compressAll( + graph, + schema, + embeddingConfig, + force, + customPrompt, + signal, + settings, + ); + } + const beforeSnapshot = clonePlanCommitValue(graph, graph); + const draftGraph = clonePlanCommitValue(graph, graph); + const result = await compressAll( + draftGraph, + schema, + embeddingConfig, + force, + customPrompt, + signal, + settings, + ); + const planCommit = commitPlannedGraphChanges({ + targetGraph: graph, + beforeSnapshot, + draftGraph, + includeSummaryState: false, + }); + return { + ...(result && typeof result === "object" && !Array.isArray(result) + ? result + : { created: 0, archived: 0 }), + planCommit, + }; +} + function updateBackgroundMaintenanceQueueState(snapshot = null) { const normalized = snapshot && typeof snapshot === "object" && !Array.isArray(snapshot) @@ -16431,6 +16779,142 @@ function scheduleBackgroundVectorSync(task = null, settings = {}) { ); } +function hasPlanCommitChanges(planCommit = null) { + if (!planCommit || typeof planCommit !== "object") return false; + return [ + "nodesAdded", + "nodesUpdated", + "edgesAdded", + "summaryEntriesAdded", + "summaryEntriesUpdated", + "summaryEntriesFolded", + ].some((key) => Number(planCommit[key] || 0) > 0); +} + +function scheduleBackgroundMaintenancePostProcess(tasks = [], settings = {}) { + const taskList = Array.isArray(tasks) + ? tasks.filter((task) => task && typeof task === "object" && task.type) + : []; + if (!taskList.length) { + return { + queued: false, + reason: "no-background-maintenance-tasks", + snapshot: updateBackgroundMaintenanceQueueState(null), + }; + } + const scheduledSettings = clonePlanCommitValue(settings, settings) || settings; + const mode = resolveMaintenancePostProcessConcurrency(scheduledSettings).mode; + const taskId = taskList.map((task) => String(task.id || task.type)).join("+"); + return enqueueBackgroundMaintenanceTask( + "post-process", + async () => { + await new Promise((resolve) => setTimeout(resolve, 0)); + ensureCurrentGraphRuntimeState(); + const details = []; + let changed = false; + if (typeof setLastExtractionStatus === "function") { + setLastExtractionStatus( + "后台维护中", + `${mode} 模式 · 正在执行 ${taskList.map((task) => task.type).join(" / ")}`, + "running", + { syncRuntime: false }, + ); + } + for (const task of taskList) { + const type = String(task.type || "").trim(); + const payload = + task.payload && typeof task.payload === "object" && !Array.isArray(task.payload) + ? task.payload + : {}; + if (type === "summary") { + const result = await runSummaryPostProcessPlanCommit({ + graph: currentGraph, + chat: Array.isArray(payload.chat) + ? payload.chat + : typeof getContext === "function" && Array.isArray(getContext()?.chat) + ? getContext().chat + : [], + settings: scheduledSettings, + currentExtractionCount: Number(payload.currentExtractionCount || 0) || extractionCount, + currentAssistantFloor: Number(payload.currentAssistantFloor ?? -1), + currentRange: Array.isArray(payload.currentRange) ? payload.currentRange : null, + currentNodeIds: Array.isArray(payload.currentNodeIds) ? payload.currentNodeIds : [], + }); + const taskChanged = + Boolean(result?.smallSummary?.created) || + Number(result?.rollup?.createdCount || 0) > 0 || + hasPlanCommitChanges(result?.planCommit); + changed = changed || taskChanged; + details.push({ type, changed: taskChanged, result }); + } else if (type === "reflection") { + const result = await runReflectionPostProcessPlanCommit({ + graph: currentGraph, + currentSeq: Number(payload.currentSeq ?? -1), + schema: getSchema(), + embeddingConfig: getEmbeddingConfig(), + settings: scheduledSettings, + }); + const taskChanged = + Boolean(result?.reflectionId) || hasPlanCommitChanges(result?.planCommit); + changed = changed || taskChanged; + details.push({ type, changed: taskChanged, result }); + } else if (type === "compression") { + const beforeSnapshot = + typeof cloneGraphSnapshot === "function" + ? cloneGraphSnapshot(currentGraph) + : clonePlanCommitValue(currentGraph, currentGraph); + const result = await runCompressionPostProcessPlanCommit({ + graph: currentGraph, + schema: getSchema(), + embeddingConfig: getEmbeddingConfig(), + force: Boolean(payload.force), + customPrompt: payload.customPrompt ?? undefined, + settings: scheduledSettings, + }); + const taskChanged = + Number(result?.created || 0) > 0 || + Number(result?.archived || 0) > 0 || + hasPlanCommitChanges(result?.planCommit); + if (taskChanged) { + const compressionSummary = + typeof buildMaintenanceSummary === "function" + ? buildMaintenanceSummary("compress", result, "auto") + : `自动压缩:新增 ${result?.created || 0},归档 ${result?.archived || 0}`; + if (typeof recordMaintenanceAction === "function") { + recordMaintenanceAction({ + action: "compress", + beforeSnapshot, + mode: "auto", + summary: compressionSummary, + }); + } + } + changed = changed || taskChanged; + details.push({ type, changed: taskChanged, result }); + } + } + if (changed) { + saveGraphToChat({ + reason: `background-post-process:${taskList.map((task) => task.type).join("+")}`, + }); + } + if (typeof setLastExtractionStatus === "function") { + setLastExtractionStatus( + changed ? "后台维护完成" : "后台维护跳过", + changed ? "后台维护已完成并持久化" : "后台维护未产生可持久化变化", + changed ? "success" : "warning", + { syncRuntime: false }, + ); + } + return { changed, details }; + }, + scheduledSettings, + { + id: `post-process:${taskId}`, + }, + ); +} + async function ensureVectorReadyIfNeeded( reason = "vector-ready-check", signal = undefined, @@ -19795,38 +20279,8 @@ async function handleExtractionSuccess( return `${prefix}维护已执行`; } }; - const runSummaryPostProcess = - typeof runHierarchicalSummaryPostProcess === "function" - ? runHierarchicalSummaryPostProcess - : typeof generateSynopsis === "function" - ? async (params = {}) => { - await generateSynopsis({ - graph: params.graph, - schema: typeof getSchema === "function" ? getSchema() : [], - currentSeq: params.currentAssistantFloor, - settings: params.settings, - signal: params.signal, - }); - return { - created: true, - smallSummary: { created: true, reason: "" }, - rollup: null, - }; - } - : async () => ({ - created: false, - smallSummary: { - created: false, - reason: "层级总结运行器不可用,已跳过", - }, - rollup: null, - }); - const summaryStageLabel = - typeof runHierarchicalSummaryPostProcess === "function" - ? "层级总结" - : typeof generateSynopsis === "function" - ? "旧式全局概要生成" - : "层级总结"; + const runSummaryPostProcess = runSummaryPostProcessPlanCommit; + const summaryStageLabel = getSummaryStageLabel(); const cloneMaintenanceSnapshot = typeof cloneGraphSnapshot === "function" ? cloneGraphSnapshot @@ -19846,6 +20300,32 @@ async function handleExtractionSuccess( noticeMarquee, }); }; + const deferredMaintenance = []; + const maintenancePostProcessConcurrency = + resolveMaintenancePostProcessConcurrency(settings); + const enqueueDeferredMaintenance = (task) => { + if (!task || typeof task !== "object" || !task.type) return null; + const normalizedTask = { + id: String(task.id || `${task.type}:${Date.now()}:${endIdx}`), + type: String(task.type), + mode: maintenancePostProcessConcurrency.mode, + reason: String(task.reason || `background-${task.type}-after-extraction`), + payload: + task.payload && typeof task.payload === "object" && !Array.isArray(task.payload) + ? clonePlanCommitValue(task.payload, {}) + : {}, + }; + deferredMaintenance.push(normalizedTask); + status.backgroundMaintenanceQueued = true; + status.backgroundMaintenanceMode = normalizedTask.mode; + status.backgroundMaintenanceTasks = deferredMaintenance.map((item) => ({ + id: item.id, + type: item.type, + reason: item.reason, + })); + pushBatchStageArtifact(status, "finalize", `${normalizedTask.type}-queued`); + return normalizedTask; + }; throwIfAborted(signal, "提取已终止"); extractionCount++; ensureCurrentGraphRuntimeState(); @@ -19974,31 +20454,48 @@ async function handleExtractionSuccess( typeof getContext === "function" && Array.isArray(getContext()?.chat) ? getContext().chat : []; - updateExtractionPostProcessStatus( - summaryStageLabel === "旧式全局概要生成" ? "旧式全局概要更新中" : "层级总结处理中", - summaryStageLabel === "旧式全局概要生成" - ? `${extractionCount} 次提取,正在生成旧式全局概要` - : `${extractionCount} 次提取,正在检查小总结与折叠总结`, - ); - const summaryResult = await runSummaryPostProcess({ - graph: currentGraph, - chat: currentChatMessages, - settings, - signal, + const summaryPayload = { + chat: clonePlanCommitValue(currentChatMessages, []), currentExtractionCount: extractionCount, currentAssistantFloor: endIdx, currentRange: result?.processedRange || [endIdx, endIdx], currentNodeIds: result?.changedNodeIds || result?.newNodeIds || [], - }); - if (summaryResult?.smallSummary?.created) { - postProcessArtifacts.push("summary"); - pushBatchStageArtifact(status, "semantic", "summary"); - } else if (summaryResult?.smallSummary?.reason) { - applyMaintenanceGateNote(status, "summary", summaryResult.smallSummary.reason); - } - if (Number(summaryResult?.rollup?.createdCount || 0) > 0) { - postProcessArtifacts.push("summary-rollup"); - pushBatchStageArtifact(status, "semantic", "summary-rollup"); + }; + if (shouldDeferExtractionMaintenance(settings)) { + enqueueDeferredMaintenance({ + type: "summary", + reason: "background-summary-after-extraction", + payload: summaryPayload, + }); + updateExtractionPostProcessStatus( + "层级总结已排队", + `${maintenancePostProcessConcurrency.mode} 模式:层级总结将在批次持久化后后台执行`, + ); + pushBatchStageArtifact(status, "semantic", "summary-queued"); + } else { + updateExtractionPostProcessStatus( + summaryStageLabel === "旧式全局概要生成" ? "旧式全局概要更新中" : "层级总结处理中", + summaryStageLabel === "旧式全局概要生成" + ? `${extractionCount} 次提取,正在生成旧式全局概要` + : `${extractionCount} 次提取,正在检查小总结与折叠总结`, + ); + const summaryResult = await runSummaryPostProcess({ + graph: currentGraph, + chat: currentChatMessages, + settings, + signal, + ...summaryPayload, + }); + if (summaryResult?.smallSummary?.created) { + postProcessArtifacts.push("summary"); + pushBatchStageArtifact(status, "semantic", "summary"); + } else if (summaryResult?.smallSummary?.reason) { + applyMaintenanceGateNote(status, "summary", summaryResult.smallSummary.reason); + } + if (Number(summaryResult?.rollup?.createdCount || 0) > 0) { + postProcessArtifacts.push("summary-rollup"); + pushBatchStageArtifact(status, "semantic", "summary-rollup"); + } } } catch (e) { if (isAbortError(e)) throw e; @@ -20018,20 +20515,36 @@ async function handleExtractionSuccess( extractionCount % settings.reflectEveryN === 0 ) { try { - updateExtractionPostProcessStatus( - "反思生成中", - `${extractionCount} 次提取,正在生成长期反思`, - ); - await generateReflection({ - graph: currentGraph, - currentSeq: endIdx, - schema: getSchema(), - embeddingConfig: getEmbeddingConfig(), - settings, - signal, - }); - postProcessArtifacts.push("reflection"); - pushBatchStageArtifact(status, "semantic", "reflection"); + const reflectionPayload = { currentSeq: endIdx }; + if (shouldDeferExtractionMaintenance(settings)) { + enqueueDeferredMaintenance({ + type: "reflection", + reason: "background-reflection-after-extraction", + payload: reflectionPayload, + }); + updateExtractionPostProcessStatus( + "反思生成已排队", + `${maintenancePostProcessConcurrency.mode} 模式:长期反思将在批次持久化后后台执行`, + ); + pushBatchStageArtifact(status, "semantic", "reflection-queued"); + } else { + updateExtractionPostProcessStatus( + "反思生成中", + `${extractionCount} 次提取,正在生成长期反思`, + ); + const reflectionResult = await runReflectionPostProcessPlanCommit({ + graph: currentGraph, + currentSeq: endIdx, + schema: getSchema(), + embeddingConfig: getEmbeddingConfig(), + settings, + signal, + }); + if (reflectionResult?.reflectionId) { + postProcessArtifacts.push("reflection"); + pushBatchStageArtifact(status, "semantic", "reflection"); + } + } } catch (e) { if (isAbortError(e)) throw e; const message = e?.message || String(e) || "反思生成阶段失败"; @@ -20101,37 +20614,53 @@ async function handleExtractionSuccess( "已到自动压缩周期,但当前没有达到内部压缩阈值的候选组"; pushBatchStageArtifact(status, "structural", "compression-skipped"); } else { - updateExtractionPostProcessStatus( - "自动压缩中", - `已到第 ${extractionCount} 次提取周期,正在压缩层级记忆`, - ); status.autoCompressionSkippedReason = ""; - const beforeSnapshot = cloneMaintenanceSnapshot(currentGraph); - const compressionResult = await compressAll( - currentGraph, - getSchema(), - getEmbeddingConfig(), - false, - undefined, - signal, - settings, - ); - if (compressionResult.created > 0 || compressionResult.archived > 0) { - persistMaintenanceAction({ - action: "compress", - beforeSnapshot, - mode: "auto", - summary: summarizeMaintenance( - "compress", - compressionResult, - "auto", - ), + if (shouldDeferExtractionMaintenance(settings)) { + enqueueDeferredMaintenance({ + type: "compression", + reason: "background-compression-after-extraction", + payload: { + force: false, + customPrompt: null, + }, }); - postProcessArtifacts.push("compression"); - pushBatchStageArtifact(status, "structural", "compression"); + updateExtractionPostProcessStatus( + "自动压缩已排队", + `${maintenancePostProcessConcurrency.mode} 模式:层级压缩将在批次持久化后后台执行`, + ); + pushBatchStageArtifact(status, "structural", "compression-queued"); } else { - status.autoCompressionSkippedReason = - "已尝试自动压缩,但本轮未产生可持久化变化"; + updateExtractionPostProcessStatus( + "自动压缩中", + `已到第 ${extractionCount} 次提取周期,正在压缩层级记忆`, + ); + const beforeSnapshot = cloneMaintenanceSnapshot(currentGraph); + const compressionResult = await runCompressionPostProcessPlanCommit({ + graph: currentGraph, + schema: getSchema(), + embeddingConfig: getEmbeddingConfig(), + force: false, + customPrompt: undefined, + signal, + settings, + }); + if (compressionResult.created > 0 || compressionResult.archived > 0) { + persistMaintenanceAction({ + action: "compress", + beforeSnapshot, + mode: "auto", + summary: summarizeMaintenance( + "compress", + compressionResult, + "auto", + ), + }); + postProcessArtifacts.push("compression"); + pushBatchStageArtifact(status, "structural", "compression"); + } else { + status.autoCompressionSkippedReason = + "已尝试自动压缩,但本轮未产生可持久化变化"; + } } } } @@ -20247,6 +20776,7 @@ async function handleExtractionSuccess( warnings: status.warnings, batchStatus: finalizeBatchStatus(status, extractionCount), backgroundVectorSync: null, + backgroundMaintenance: deferredMaintenance, }; } @@ -20285,6 +20815,7 @@ async function handleExtractionSuccess( warnings: status.warnings, batchStatus: finalizeBatchStatus(status, extractionCount), backgroundVectorSync, + backgroundMaintenance: deferredMaintenance, }; } @@ -20596,6 +21127,7 @@ async function executeExtractionBatch({ getSchema, handleExtractionSuccess, persistExtractionBatchResult, + scheduleBackgroundMaintenancePostProcess, scheduleBackgroundVectorSync, setBatchStageOutcome, setLastExtractionStatus, diff --git a/maintenance/extraction-controller.js b/maintenance/extraction-controller.js index 458bdbf..618b791 100644 --- a/maintenance/extraction-controller.js +++ b/maintenance/extraction-controller.js @@ -909,6 +909,35 @@ export async function executeExtractionBatchController( const persistence = normalizePersistenceStateRecord(persistResult); batchStatusRef.persistence = persistence; batchStatusRef.historyAdvanceAllowed = persistence.accepted === true; + let backgroundMaintenanceQueue = null; + if ( + persistence.accepted === true && + Array.isArray(effects?.backgroundMaintenance) && + effects.backgroundMaintenance.length > 0 && + typeof runtime.scheduleBackgroundMaintenancePostProcess === "function" + ) { + backgroundMaintenanceQueue = runtime.scheduleBackgroundMaintenancePostProcess( + effects.backgroundMaintenance, + settings, + ); + batchStatusRef.backgroundMaintenanceState = + backgroundMaintenanceQueue?.queued === true ? "queued" : "queue-failed"; + batchStatusRef.backgroundMaintenanceQueue = + cloneSerializable(backgroundMaintenanceQueue, backgroundMaintenanceQueue); + if (backgroundMaintenanceQueue?.queued !== true) { + runtime.setBatchStageOutcome( + batchStatusRef, + "finalize", + "partial", + `后台维护入队失败: ${backgroundMaintenanceQueue?.reason || "queue-failed"}`, + ); + } + } else if ( + Array.isArray(effects?.backgroundMaintenance) && + effects.backgroundMaintenance.length > 0 + ) { + batchStatusRef.backgroundMaintenanceState = "blocked-by-persistence"; + } let backgroundVectorSyncQueue = null; if ( persistence.accepted === true && @@ -985,6 +1014,7 @@ export async function executeExtractionBatchController( effects: { ...(effects || {}), persistResult, + backgroundMaintenanceQueue, backgroundVectorSyncQueue, }, batchStatus: finalizedBatchStatus, diff --git a/tests/p0-regressions.mjs b/tests/p0-regressions.mjs index a34b69d..b9cc331 100644 --- a/tests/p0-regressions.mjs +++ b/tests/p0-regressions.mjs @@ -3583,6 +3583,98 @@ async function testBalancedModeDefersExtractionVectorSync() { assert.equal(effects.batchStatus.backgroundVectorSyncQueued, true); } +async function testBalancedModeDefersExtractionMaintenancePostProcess() { + const harness = await createBatchStageHarness(); + const { createBatchStatusSkeleton, handleExtractionSuccess } = harness.result; + let synopsisCalls = 0; + let reflectionCalls = 0; + let compressionCalls = 0; + let vectorSyncCalls = 0; + harness.currentGraph = { + nodes: [], + edges: [], + historyState: { extractionCount: 0 }, + vectorIndexState: {}, + }; + harness.ensureCurrentGraphRuntimeState = () => { + harness.currentGraph.historyState ||= {}; + harness.currentGraph.vectorIndexState ||= {}; + }; + harness.generateSynopsis = async () => { + synopsisCalls += 1; + harness.currentGraph.nodes.push({ id: "synopsis-queued", type: "synopsis" }); + return { ok: true }; + }; + harness.generateReflection = async () => { + reflectionCalls += 1; + harness.currentGraph.nodes.push({ id: "reflection-queued", type: "reflection" }); + return "reflection-queued"; + }; + harness.inspectAutoCompressionCandidates = () => ({ + hasCandidates: true, + reason: "", + }); + harness.compressAll = async () => { + compressionCalls += 1; + harness.currentGraph.nodes.push({ id: "compression-queued", level: 1 }); + return { created: 1, archived: 0 }; + }; + harness.syncVectorState = async () => { + vectorSyncCalls += 1; + return { + insertedHashes: ["should-not-run"], + stats: { pending: 0 }, + }; + }; + + const batchStatus = createBatchStatusSkeleton({ + processedRange: [21, 21], + extractionCountBefore: 0, + }); + const effects = await handleExtractionSuccess( + { + newNodeIds: ["node-maintenance-bg"], + processedRange: [21, 21], + }, + 21, + { + maintenanceExecutionMode: "balanced", + enableConsolidation: false, + enableHierarchicalSummary: true, + enableReflection: true, + reflectEveryN: 1, + enableSleepCycle: false, + sleepEveryN: 1, + enableAutoCompression: true, + compressionEveryN: 1, + }, + undefined, + batchStatus, + ); + + assert.equal(synopsisCalls, 0); + assert.equal(reflectionCalls, 0); + assert.equal(compressionCalls, 0); + assert.equal(vectorSyncCalls, 0); + assert.deepEqual( + Array.from(effects.backgroundMaintenance, (task) => task.type), + ["summary", "reflection", "compression"], + ); + assert.equal(effects.batchStatus.backgroundMaintenanceQueued, true); + assert.equal(effects.batchStatus.backgroundMaintenanceMode, "balanced"); + assert.ok( + effects.batchStatus.stages.semantic.artifacts.includes("summary-queued"), + ); + assert.ok( + effects.batchStatus.stages.semantic.artifacts.includes("reflection-queued"), + ); + assert.ok( + effects.batchStatus.stages.structural.artifacts.includes("compression-queued"), + ); + assert.equal(effects.backgroundVectorSync?.enabled, true); + assert.equal(effects.backgroundVectorSync?.mode, "balanced"); +} + async function testBackgroundVectorSyncScheduledAfterAcceptedPersistence() { const graph = { nodes: [], @@ -3684,6 +3776,114 @@ async function testBackgroundVectorSyncScheduledAfterAcceptedPersistence() { assert.equal(result.effects.backgroundVectorSyncQueue?.queued, true); } +async function testBackgroundMaintenanceScheduledAfterAcceptedPersistence() { + const graph = { + nodes: [], + edges: [], + historyState: { extractionCount: 0 }, + vectorIndexState: {}, + }; + let scheduledTasks = []; + let scheduleCalls = 0; + const runtime = { + appendBatchJournal(targetGraph, entry) { + targetGraph.batchJournal = [...(targetGraph.batchJournal || []), entry]; + }, + applyProcessedHistorySnapshotToGraph(targetGraph, _chat, endFloor) { + targetGraph.historyState ||= {}; + targetGraph.historyState.lastProcessedAssistantFloor = endFloor; + targetGraph.lastProcessedSeq = endFloor; + }, + buildPersistDelta: () => null, + buildExtractionMessages: () => [], + cloneGraphSnapshot: (value) => JSON.parse(JSON.stringify(value)), + computePostProcessArtifacts: (_before, _after, artifacts = []) => artifacts, + console, + createBatchJournalEntry: (_before, _after, options) => ({ + type: "batch", + ...options, + }), + createBatchStatusSkeleton, + ensureCurrentGraphRuntimeState() { + graph.historyState ||= {}; + graph.vectorIndexState ||= {}; + }, + extractMemories: async () => ({ + success: true, + newNodeIds: ["node-bg-maintenance"], + processedRange: [2, 2], + }), + finalizeBatchStatus, + getCurrentGraph: () => graph, + getEmbeddingConfig: () => null, + getExtractionCount: () => 1, + getLastProcessedAssistantFloor: () => -1, + getSettings: () => ({}), + getSchema: () => schema, + handleExtractionSuccess: async (_result, _endIdx, _settings, _signal, status) => { + setBatchStageOutcome(status, "finalize", "success"); + status.backgroundMaintenanceQueued = true; + status.backgroundMaintenanceMode = "balanced"; + status.backgroundMaintenanceTasks = [ + { id: "summary:test", type: "summary", reason: "test" }, + { id: "reflection:test", type: "reflection", reason: "test" }, + ]; + return { + postProcessArtifacts: [], + vectorHashesInserted: [], + vectorStats: { pending: 0 }, + vectorError: "", + batchStatus: finalizeBatchStatus(status, 1), + backgroundMaintenance: [ + { id: "summary:test", type: "summary", reason: "test", payload: {} }, + { id: "reflection:test", type: "reflection", reason: "test", payload: {} }, + ], + backgroundVectorSync: null, + }; + }, + persistExtractionBatchResult: async () => ({ + accepted: true, + revision: 1, + storageTier: "metadata-full", + saveMode: "full", + outcome: "accepted", + }), + scheduleBackgroundMaintenancePostProcess(tasks) { + scheduleCalls += 1; + scheduledTasks = tasks; + return { + queued: true, + id: "post-process:test", + snapshot: { state: "queued", queued: 1 }, + }; + }, + setBatchStageOutcome, + setLastExtractionStatus: () => {}, + shouldAdvanceProcessedHistory: () => true, + throwIfAborted: () => {}, + updateProcessedHistorySnapshot(chat, endFloor) { + graph.historyState.processedChatLength = chat.length; + graph.historyState.lastProcessedAssistantFloor = endFloor; + }, + }; + + const result = await executeExtractionBatchController(runtime, { + chat: [{ is_user: true }, { is_user: false }], + startIdx: 2, + endIdx: 2, + settings: { maintenanceExecutionMode: "balanced" }, + }); + + assert.equal(result.historyAdvanceAllowed, true); + assert.equal(scheduleCalls, 1); + assert.deepEqual( + scheduledTasks.map((task) => task.type), + ["summary", "reflection"], + ); + assert.equal(result.batchStatus.backgroundMaintenanceState, "queued"); + assert.equal(result.effects.backgroundMaintenanceQueue?.queued, true); +} + async function testAutoConsolidationRunsOnHighDuplicateRiskSingleNode() { const harness = await createBatchStageHarness(); const { createBatchStatusSkeleton, handleExtractionSuccess } = harness.result; @@ -7890,7 +8090,9 @@ await testBatchStatusStructuralPartialRemainsRecoverable(); await testBatchStatusSemanticFailureDoesNotHideCoreSuccess(); await testExtractionPostProcessStatusesExposeMaintenancePhases(); await testBalancedModeDefersExtractionVectorSync(); +await testBalancedModeDefersExtractionMaintenancePostProcess(); await testBackgroundVectorSyncScheduledAfterAcceptedPersistence(); +await testBackgroundMaintenanceScheduledAfterAcceptedPersistence(); await testAutoConsolidationRunsOnHighDuplicateRiskSingleNode(); await testAutoConsolidationSkipsLowRiskSingleNode(); await testAutoConsolidationSuppressedForBulkExtractionBatch(); diff --git a/ui/panel.html b/ui/panel.html index 1336e7b..69ce947 100644 --- a/ui/panel.html +++ b/ui/panel.html @@ -260,6 +260,10 @@
+