Implement background maintenance vector sync

This commit is contained in:
Youzini-afk
2026-04-29 14:51:51 +08:00
parent 2c4a82d11d
commit 69dc452120
6 changed files with 711 additions and 62 deletions

View File

@@ -22,6 +22,7 @@ import {
registerCoreEventHooksController,
} from "../host/event-binding.js";
import {
executeExtractionBatchController,
onRerollController,
resolveAutoExtractionPlanController,
runExtractionController,
@@ -3524,6 +3525,165 @@ async function testExtractionPostProcessStatusesExposeMaintenancePhases() {
assert.ok(statusTexts.includes("向量同步中"));
}
async function testBalancedModeDefersExtractionVectorSync() {
const harness = await createBatchStageHarness();
const { createBatchStatusSkeleton, handleExtractionSuccess } = harness.result;
let syncCalls = 0;
harness.currentGraph = {
historyState: { extractionCount: 0 },
vectorIndexState: {},
};
harness.ensureCurrentGraphRuntimeState = () => {
harness.currentGraph.historyState ||= {};
harness.currentGraph.vectorIndexState ||= {};
};
harness.syncVectorState = async () => {
syncCalls += 1;
return {
insertedHashes: ["should-not-run"],
stats: { pending: 0 },
};
};
const batchStatus = createBatchStatusSkeleton({
processedRange: [12, 13],
extractionCountBefore: 0,
});
const effects = await handleExtractionSuccess(
{
newNodeIds: ["node-bg"],
processedRange: [12, 13],
},
13,
{
maintenanceExecutionMode: "balanced",
enableConsolidation: false,
enableSynopsis: false,
enableReflection: false,
enableSleepCycle: false,
compressionEveryN: 0,
synopsisEveryN: 1,
reflectEveryN: 1,
sleepEveryN: 1,
},
undefined,
batchStatus,
);
assert.equal(syncCalls, 0);
assert.equal(effects.backgroundVectorSync?.enabled, true);
assert.equal(effects.backgroundVectorSync?.mode, "balanced");
assert.equal(effects.backgroundVectorSync?.range?.start, 12);
assert.equal(effects.backgroundVectorSync?.range?.end, 13);
assert.equal(harness.currentGraph.vectorIndexState.dirty, true);
assert.equal(effects.batchStatus.stages.finalize.outcome, "success");
assert.ok(
effects.batchStatus.stages.finalize.artifacts.includes("vector-sync-queued"),
);
assert.equal(effects.batchStatus.backgroundVectorSyncQueued, true);
}
async function testBackgroundVectorSyncScheduledAfterAcceptedPersistence() {
const graph = {
nodes: [],
edges: [],
historyState: { extractionCount: 0 },
vectorIndexState: {},
};
let scheduledTask = null;
let scheduleCalls = 0;
const runtime = {
appendBatchJournal(targetGraph, entry) {
targetGraph.batchJournal = [...(targetGraph.batchJournal || []), entry];
},
applyProcessedHistorySnapshotToGraph(targetGraph, _chat, endFloor) {
targetGraph.historyState ||= {};
targetGraph.historyState.lastProcessedAssistantFloor = endFloor;
targetGraph.lastProcessedSeq = endFloor;
},
buildPersistDelta: () => null,
buildExtractionMessages: () => [],
cloneGraphSnapshot: (value) => JSON.parse(JSON.stringify(value)),
computePostProcessArtifacts: (_before, _after, artifacts = []) => artifacts,
console,
createBatchJournalEntry: (_before, _after, options) => ({
type: "batch",
...options,
}),
createBatchStatusSkeleton,
ensureCurrentGraphRuntimeState() {
graph.historyState ||= {};
graph.vectorIndexState ||= {};
},
extractMemories: async () => ({
success: true,
newNodeIds: ["node-bg"],
processedRange: [1, 1],
}),
finalizeBatchStatus,
getCurrentGraph: () => graph,
getEmbeddingConfig: () => null,
getExtractionCount: () => 1,
getLastProcessedAssistantFloor: () => -1,
getSettings: () => ({}),
getSchema: () => schema,
handleExtractionSuccess: async (_result, _endIdx, _settings, _signal, status) => {
setBatchStageOutcome(status, "finalize", "success");
return {
postProcessArtifacts: [],
vectorHashesInserted: [],
vectorStats: { pending: 1 },
vectorError: "",
batchStatus: finalizeBatchStatus(status, 1),
backgroundVectorSync: {
enabled: true,
id: "vector-sync:test",
mode: "balanced",
reason: "background-vector-sync-after-extraction",
range: { start: 1, end: 1 },
},
};
},
persistExtractionBatchResult: async () => ({
accepted: true,
revision: 1,
storageTier: "metadata-full",
saveMode: "full",
outcome: "accepted",
}),
scheduleBackgroundVectorSync(task) {
scheduleCalls += 1;
scheduledTask = task;
return {
queued: true,
id: task.id,
snapshot: { state: "queued", queued: 1 },
};
},
setBatchStageOutcome,
setLastExtractionStatus: () => {},
shouldAdvanceProcessedHistory: () => true,
throwIfAborted: () => {},
updateProcessedHistorySnapshot(chat, endFloor) {
graph.historyState.processedChatLength = chat.length;
graph.historyState.lastProcessedAssistantFloor = endFloor;
},
};
const result = await executeExtractionBatchController(runtime, {
chat: [{ is_user: true }, { is_user: false }],
startIdx: 1,
endIdx: 1,
settings: { maintenanceExecutionMode: "balanced" },
});
assert.equal(result.historyAdvanceAllowed, true);
assert.equal(scheduleCalls, 1);
assert.deepEqual(scheduledTask?.range, { start: 1, end: 1 });
assert.equal(result.batchStatus.backgroundVectorSyncState, "queued");
assert.equal(result.effects.backgroundVectorSyncQueue?.queued, true);
}
async function testAutoConsolidationRunsOnHighDuplicateRiskSingleNode() {
const harness = await createBatchStageHarness();
const { createBatchStatusSkeleton, handleExtractionSuccess } = harness.result;
@@ -7729,6 +7889,8 @@ await testReverseJournalRecoveryPlanMixedLegacyAndCurrentRetainsRepairSet();
await testBatchStatusStructuralPartialRemainsRecoverable();
await testBatchStatusSemanticFailureDoesNotHideCoreSuccess();
await testExtractionPostProcessStatusesExposeMaintenancePhases();
await testBalancedModeDefersExtractionVectorSync();
await testBackgroundVectorSyncScheduledAfterAcceptedPersistence();
await testAutoConsolidationRunsOnHighDuplicateRiskSingleNode();
await testAutoConsolidationSkipsLowRiskSingleNode();
await testAutoConsolidationSuppressedForBulkExtractionBatch();

View File

@@ -1,6 +1,7 @@
import assert from "node:assert/strict";
import {
createBackgroundMaintenanceQueue,
getMaintenanceExecutionModeLevel,
normalizeMaintenanceExecutionMode,
resolveConcurrencyConfig,
@@ -87,4 +88,44 @@ assert.equal(
);
}
{
const statuses = [];
const queue = createBackgroundMaintenanceQueue({
maxItems: 1,
maxRetries: 1,
retryBaseMs: 1,
onStatus: (snapshot) => statuses.push(snapshot),
});
let attempts = 0;
const enqueued = queue.enqueue("retry-once", async () => {
attempts += 1;
if (attempts === 1) throw new Error("transient");
return "ok";
});
assert.equal(enqueued.queued, true);
assert.equal(enqueued.snapshot.state, "queued");
await new Promise((resolve) => setTimeout(resolve, 120));
const finalSnapshot = queue.getSnapshot();
assert.equal(attempts, 2);
assert.equal(finalSnapshot.completed, 1);
assert.equal(finalSnapshot.failed, 0);
assert.equal(finalSnapshot.state, "idle");
assert.equal(finalSnapshot.lastTask?.status, "success");
assert.ok(statuses.some((snapshot) => snapshot.state === "running"));
}
{
const queue = createBackgroundMaintenanceQueue({ maxItems: 1 });
const first = queue.enqueue("slow", async () => {
await new Promise((resolve) => setTimeout(resolve, 20));
});
const second = queue.enqueue("overflow", async () => {});
assert.equal(first.queued, true);
assert.equal(second.queued, false);
assert.equal(second.reason, "background-maintenance-queue-full");
assert.equal(queue.getSnapshot().dropped, 1);
}
console.log("runtime-concurrency tests passed");