Optimize bulk rerun extraction consolidation

This commit is contained in:
Youzini-afk
2026-04-29 03:25:29 +08:00
parent 9eadbc0470
commit 6e4ba691a8
4 changed files with 579 additions and 78 deletions

View File

@@ -111,11 +111,20 @@ const chat = [
rollback: [],
manual: [],
extractionStatus: [],
warning: [],
};
const runtime = {
getContext() {
return { chat };
},
getSettings() {
return {
hideOldMessagesEnabled: true,
hideOldMessagesKeepLastN: 2,
hideOldMessagesRenderLimitEnabled: true,
hideOldMessagesRenderLimit: 4,
};
},
getIsExtracting() {
return false;
},
@@ -134,7 +143,9 @@ const chat = [
calls.manual.push({ ...options });
},
toastr: {
warning() {},
warning(message) {
calls.warning.push(String(message || ""));
},
info() {},
},
};
@@ -147,12 +158,18 @@ const chat = [
assert.equal(result.fallbackToLatest, false);
assert.deepEqual(calls.rollback, [6]);
assert.equal(calls.manual[0].lockedEndFloor, 6);
assert.equal(calls.manual[0].suppressIntermediateAutoConsolidation, true);
assert.equal(calls.manual[0].showStartToast, false);
assert.equal(calls.extractionStatus[0]?.text, "重新提取准备中");
assert.match(calls.extractionStatus[0]?.meta || "", /旧楼层隐藏/);
assert.match(calls.extractionStatus[0]?.meta || "", /渲染楼层限制/);
assert.match(calls.warning[0] || "", /解除消息隐藏/);
assert.match(result.visibilityWarning || "", /渲染楼层限制/);
}
{
const statuses = [];
const executeCalls = [];
let lastProcessedAssistantFloor = -1;
const runtime = {
getIsExtracting() {
@@ -192,7 +209,9 @@ const chat = [
setLastExtractionStatus(text, meta, level) {
statuses.push({ text, meta, level });
},
async executeExtractionBatch({ endIdx }) {
async executeExtractionBatch(options = {}) {
executeCalls.push(options);
const endIdx = options.endIdx;
lastProcessedAssistantFloor = endIdx;
return {
success: true,
@@ -224,6 +243,7 @@ const chat = [
taskLabel: "重新提取",
toastTitle: "ST-BME 重新提取",
showStartToast: false,
suppressIntermediateAutoConsolidation: true,
});
assert.equal(statuses[0]?.text, "重新提取中");
@@ -236,6 +256,117 @@ const chat = [
),
);
assert.equal(statuses[statuses.length - 1]?.text, "重新提取完成");
assert.equal(executeCalls.length, 2);
assert.equal(
executeCalls[0]?.postProcessContext?.suppressAutoConsolidation,
true,
);
assert.equal(executeCalls[0]?.postProcessContext?.remainingAssistantTurnsAfterBatch, 1);
assert.equal(executeCalls[1]?.postProcessContext, null);
}
{
const executeCalls = [];
let lastProcessedAssistantFloor = -1;
const assistantTurns = [2, 4, 6, 8];
const runtime = {
getIsExtracting() {
return false;
},
ensureGraphMutationReady() {
return true;
},
async recoverHistoryIfNeeded() {
return true;
},
getCurrentGraph() {
return { historyState: {} };
},
getContext() {
return { chat };
},
getAssistantTurns() {
return assistantTurns;
},
getLastProcessedAssistantFloor() {
return lastProcessedAssistantFloor;
},
getSettings() {
return { extractEvery: 1, bulkAutoConsolidationEveryBatches: 2 };
},
clampInt(value, fallback, min, max) {
const numeric = Number(value);
if (!Number.isFinite(numeric)) return fallback;
return Math.min(max, Math.max(min, Math.trunc(numeric)));
},
setIsExtracting() {},
beginStageAbortController() {
return { signal: null };
},
finishStageAbortController() {},
setLastExtractionStatus() {},
async executeExtractionBatch(options = {}) {
executeCalls.push(options);
lastProcessedAssistantFloor = options.endIdx;
return {
success: true,
result: {
newNodes: 1,
newNodeIds: [`node-${options.endIdx}`],
updatedNodes: 0,
newEdges: 1,
},
effects: {
warnings: [],
},
historyAdvanceAllowed: true,
};
},
isAbortError() {
return false;
},
refreshPanelLiveState() {},
retryPendingGraphPersist: async () => ({ accepted: true }),
toastr: {
info() {},
success() {},
warning() {},
error() {},
},
};
await onManualExtractController(runtime, {
taskLabel: "重新提取",
toastTitle: "ST-BME 重新提取",
showStartToast: false,
suppressIntermediateAutoConsolidation: true,
});
assert.equal(executeCalls.length, 4);
assert.equal(
executeCalls[0]?.postProcessContext?.suppressAutoConsolidation,
true,
);
assert.equal(
executeCalls[1]?.postProcessContext?.suppressAutoConsolidation,
false,
);
assert.deepEqual(
executeCalls[1]?.postProcessContext?.pendingAutoConsolidationNodeIds,
["node-2"],
);
assert.equal(
executeCalls[2]?.postProcessContext?.suppressAutoConsolidation,
true,
);
assert.equal(
executeCalls[3]?.postProcessContext?.suppressAutoConsolidation,
false,
);
assert.deepEqual(
executeCalls[3]?.postProcessContext?.pendingAutoConsolidationNodeIds,
["node-6"],
);
}
{

View File

@@ -281,6 +281,12 @@ function createBatchStageHarness() {
.replace(/^export\s+/gm, "");
const context = {
console,
AbortController,
AbortSignal,
DOMException,
setTimeout,
clearTimeout,
EXTRACTION_VECTOR_SYNC_TIMEOUT_MS: 120000,
result: null,
extractionCount: 0,
currentGraph: null,
@@ -3678,6 +3684,148 @@ async function testAutoConsolidationSkipsLowRiskSingleNode() {
);
}
async function testAutoConsolidationSuppressedForBulkExtractionBatch() {
const harness = await createBatchStageHarness();
const { createBatchStatusSkeleton, handleExtractionSuccess } = harness.result;
harness.currentGraph = {
historyState: { extractionCount: 0 },
vectorIndexState: {},
};
harness.ensureCurrentGraphRuntimeState = () => {
harness.currentGraph.historyState ||= {};
harness.currentGraph.vectorIndexState ||= {};
};
let consolidateCalls = 0;
let gateCalls = 0;
harness.analyzeAutoConsolidationGate = async () => {
gateCalls += 1;
return {
triggered: true,
reason: "should-not-run",
matchedScore: 0.99,
matchedNodeId: "old-bulk",
};
};
harness.consolidateMemories = async () => {
consolidateCalls += 1;
return {
merged: 1,
skipped: 0,
kept: 0,
evolved: 0,
connections: 0,
updates: 0,
};
};
harness.syncVectorState = async () => ({
insertedHashes: [],
stats: { pending: 0 },
});
const batchStatus = createBatchStatusSkeleton({
processedRange: [8, 8],
extractionCountBefore: 0,
});
const effects = await handleExtractionSuccess(
{ newNodeIds: ["node-a", "node-b", "node-c"] },
8,
{
enableConsolidation: true,
consolidationAutoMinNewNodes: 2,
consolidationThreshold: 0.85,
enableAutoCompression: false,
compressionEveryN: 10,
enableSynopsis: false,
enableReflection: false,
enableSleepCycle: false,
synopsisEveryN: 1,
reflectEveryN: 1,
sleepEveryN: 1,
},
undefined,
batchStatus,
{
suppressAutoConsolidation: true,
autoConsolidationSuppressReason: "bulk rerun still draining",
},
);
assert.equal(gateCalls, 0);
assert.equal(consolidateCalls, 0);
assert.equal(effects.batchStatus.consolidationGateTriggered, false);
assert.equal(effects.batchStatus.consolidationGateReason, "bulk rerun still draining");
assert.equal(
effects.batchStatus.stages.structural.artifacts.includes(
"consolidation-skipped",
),
true,
);
}
async function testAutoConsolidationUsesDeferredBulkCandidates() {
const harness = await createBatchStageHarness();
const { createBatchStatusSkeleton, handleExtractionSuccess } = harness.result;
harness.currentGraph = {
historyState: { extractionCount: 0 },
vectorIndexState: {},
};
harness.ensureCurrentGraphRuntimeState = () => {
harness.currentGraph.historyState ||= {};
harness.currentGraph.vectorIndexState ||= {};
};
let candidateIds = [];
harness.consolidateMemories = async ({ newNodeIds = [] } = {}) => {
candidateIds = [...newNodeIds];
return {
merged: 1,
skipped: 0,
kept: 0,
evolved: 0,
connections: 0,
updates: 0,
};
};
harness.syncVectorState = async () => ({
insertedHashes: [],
stats: { pending: 0 },
});
const batchStatus = createBatchStatusSkeleton({
processedRange: [10, 10],
extractionCountBefore: 0,
});
const effects = await handleExtractionSuccess(
{ newNodeIds: ["node-current", "node-old"] },
10,
{
enableConsolidation: true,
consolidationAutoMinNewNodes: 2,
consolidationThreshold: 0.85,
enableAutoCompression: false,
compressionEveryN: 10,
enableSynopsis: false,
enableReflection: false,
enableSleepCycle: false,
synopsisEveryN: 1,
reflectEveryN: 1,
sleepEveryN: 1,
},
undefined,
batchStatus,
{
suppressAutoConsolidation: false,
pendingAutoConsolidationNodeIds: ["node-old", "node-deferred"],
},
);
assert.deepEqual(candidateIds, ["node-old", "node-deferred", "node-current"]);
assert.equal(effects.batchStatus.consolidationGateTriggered, true);
assert.equal(
effects.batchStatus.stages.structural.artifacts.includes("consolidation"),
true,
);
}
async function testAutoCompressionRunsOnlyOnConfiguredInterval() {
const harness = await createBatchStageHarness();
const { createBatchStatusSkeleton, handleExtractionSuccess } = harness.result;
@@ -7510,6 +7658,8 @@ await testBatchStatusSemanticFailureDoesNotHideCoreSuccess();
await testExtractionPostProcessStatusesExposeMaintenancePhases();
await testAutoConsolidationRunsOnHighDuplicateRiskSingleNode();
await testAutoConsolidationSkipsLowRiskSingleNode();
await testAutoConsolidationSuppressedForBulkExtractionBatch();
await testAutoConsolidationUsesDeferredBulkCandidates();
await testAutoCompressionRunsOnlyOnConfiguredInterval();
await testAutoCompressionSkipsWhenNotScheduledOrNoCandidates();
await testBatchStatusFinalizeFailureIsNotCompleteSuccess();