From 69dc4521203e3383910656f1a092b4219f0565fa Mon Sep 17 00:00:00 2001 From: Youzini-afk <13153778771cx@gmail.com> Date: Wed, 29 Apr 2026 14:51:51 +0800 Subject: [PATCH] Implement background maintenance vector sync --- index.js | 346 ++++++++++++++++++++++----- maintenance/extraction-controller.js | 26 ++ runtime/concurrency.js | 187 +++++++++++++++ tests/p0-regressions.mjs | 162 +++++++++++++ tests/runtime-concurrency.mjs | 41 ++++ ui/ui-status.js | 11 + 6 files changed, 711 insertions(+), 62 deletions(-) diff --git a/index.js b/index.js index e9260e0..ced6311 100644 --- a/index.js +++ b/index.js @@ -256,7 +256,10 @@ import { getPersistedSettingsSnapshot, mergePersistedSettings, } from "./runtime/settings-defaults.js"; -import { resolveConcurrencyConfig } from "./runtime/concurrency.js"; +import { + createBackgroundMaintenanceQueue, + resolveConcurrencyConfig, +} from "./runtime/concurrency.js"; import { createDefaultAuthorityCapabilityState, normalizeAuthoritySettings, @@ -1293,6 +1296,10 @@ let graphPersistenceState = createGraphPersistenceState(); let authorityCapabilityState = createDefaultAuthorityCapabilityState(); let authorityBrowserState = createAuthorityBrowserState(); let authorityProbePromise = null; +const backgroundMaintenanceQueue = + typeof createBackgroundMaintenanceQueue === "function" + ? createBackgroundMaintenanceQueue() + : null; const lastStatusToastAt = {}; let pendingRecallSendIntent = createRecallInputRecord(); let lastRecallSentUserMessage = createRecallInputRecord(); @@ -1805,6 +1812,20 @@ function getGraphPersistenceLiveState() { persistMismatchReason: String(graphPersistenceState.persistMismatchReason || ""), commitMarker: cloneRuntimeDebugValue(liveCommitMarker, null), restoreLock, + backgroundMaintenance: cloneRuntimeDebugValue( + graphPersistenceState.backgroundMaintenance, + { + state: "idle", + queued: 0, + activeId: "", + activeName: "", + completed: 0, + failed: 0, + dropped: 0, + lastTask: null, + updatedAt: 0, + }, + ), queuedPersistMode: graphPersistenceState.queuedPersistMode, queuedPersistRotateIntegrity: graphPersistenceState.queuedPersistRotateIntegrity, @@ -16146,6 +16167,107 @@ function shouldAdvanceProcessedHistory(batchStatus) { ); } +function resolveMaintenancePostProcessConcurrency(settings = {}) { + if (typeof resolveConcurrencyConfig === "function") { + try { + return resolveConcurrencyConfig(settings); + } catch { + } + } + const mode = String(settings?.maintenanceExecutionMode || "strict") + .trim() + .toLowerCase(); + const strict = mode !== "balanced" && mode !== "fast"; + return { + mode: strict ? "strict" : mode, + level: strict ? 1 : mode === "balanced" ? 2 : 3, + backgroundMaintenanceMaxRetries: Math.max( + 0, + Math.min(10, Math.floor(Number(settings?.backgroundMaintenanceMaxRetries ?? 2)) || 0), + ), + backgroundMaintenanceRetryBaseMs: Math.max( + 50, + Math.min(60000, Math.floor(Number(settings?.backgroundMaintenanceRetryBaseMs ?? 800)) || 800), + ), + backgroundMaintenanceMaxQueueItems: Math.max( + 1, + Math.min(256, Math.floor(Number(settings?.backgroundMaintenanceMaxQueueItems ?? 24)) || 24), + ), + }; +} + +function shouldDeferExtractionVectorSync(settings = {}) { + return resolveMaintenancePostProcessConcurrency(settings).mode !== "strict"; +} + +function updateBackgroundMaintenanceQueueState(snapshot = null) { + const normalized = + snapshot && typeof snapshot === "object" && !Array.isArray(snapshot) + ? { + state: String(snapshot.state || "idle"), + queued: Math.max(0, Math.floor(Number(snapshot.queued || 0)) || 0), + activeId: String(snapshot.activeId || ""), + activeName: String(snapshot.activeName || ""), + completed: Math.max(0, Math.floor(Number(snapshot.completed || 0)) || 0), + failed: Math.max(0, Math.floor(Number(snapshot.failed || 0)) || 0), + dropped: Math.max(0, Math.floor(Number(snapshot.dropped || 0)) || 0), + lastTask: + snapshot.lastTask && typeof snapshot.lastTask === "object" + ? { ...snapshot.lastTask } + : null, + updatedAt: Number(snapshot.updatedAt || Date.now()) || Date.now(), + } + : { + state: "idle", + queued: 0, + activeId: "", + activeName: "", + completed: 0, + failed: 0, + dropped: 0, + lastTask: null, + updatedAt: Date.now(), + }; + if (typeof updateGraphPersistenceState === "function") { + updateGraphPersistenceState({ backgroundMaintenance: normalized }); + } + if (typeof recordMaintenanceDebugSnapshot === "function") { + recordMaintenanceDebugSnapshot({ backgroundQueue: normalized }); + } + if (typeof refreshPanelLiveState === "function") { + refreshPanelLiveState(); + } + return normalized; +} + +function enqueueBackgroundMaintenanceTask(name, run, settings = {}, options = {}) { + const concurrency = resolveMaintenancePostProcessConcurrency(settings); + const queue = + typeof backgroundMaintenanceQueue !== "undefined" + ? backgroundMaintenanceQueue + : null; + if (!queue || typeof queue.enqueue !== "function") { + return { + queued: false, + reason: "background-maintenance-queue-unavailable", + snapshot: updateBackgroundMaintenanceQueueState(null), + }; + } + if (typeof queue.configure === "function") { + queue.configure({ + maxItems: concurrency.backgroundMaintenanceMaxQueueItems, + maxRetries: concurrency.backgroundMaintenanceMaxRetries, + retryBaseMs: concurrency.backgroundMaintenanceRetryBaseMs, + onStatus: updateBackgroundMaintenanceQueueState, + }); + } + return queue.enqueue(name, run, { + maxRetries: concurrency.backgroundMaintenanceMaxRetries, + retryBaseMs: concurrency.backgroundMaintenanceRetryBaseMs, + ...(options || {}), + }); +} + function computePostProcessArtifacts( beforeSnapshot, afterSnapshot, @@ -16262,6 +16384,53 @@ async function syncVectorState({ } } +function scheduleBackgroundVectorSync(task = null, settings = {}) { + const normalizedTask = + task && typeof task === "object" && !Array.isArray(task) ? task : {}; + const range = + normalizedTask.range && + Number.isFinite(Number(normalizedTask.range.start)) && + Number.isFinite(Number(normalizedTask.range.end)) + ? { + start: Math.floor(Number(normalizedTask.range.start)), + end: Math.floor(Number(normalizedTask.range.end)), + } + : null; + const reason = + String(normalizedTask.reason || "background-vector-sync").trim() || + "background-vector-sync"; + const mode = + String( + normalizedTask.mode || + resolveMaintenancePostProcessConcurrency(settings).mode || + "balanced", + ).trim() || "balanced"; + return enqueueBackgroundMaintenanceTask( + "vector-sync", + async () => { + setLastVectorStatus( + "后台向量同步中", + `${mode} 模式 · 正在同步提取后的向量索引`, + "running", + { syncRuntime: false }, + ); + const result = await syncVectorState({ range }); + if (result?.aborted) { + throw createAbortError(result.error || "后台向量同步已终止"); + } + if (result?.error) { + throw new Error(result.error); + } + saveGraphToChat({ reason }); + return result; + }, + settings, + { + id: String(normalizedTask.id || ""), + }, + ); +} + async function ensureVectorReadyIfNeeded( reason = "vector-ready-check", signal = undefined, @@ -19979,70 +20148,121 @@ async function handleExtractionSuccess( } let vectorSync = null; - try { - updateExtractionPostProcessStatus( - "向量同步中", - "正在同步本批提取后的向量索引", - ); - const vectorSyncTimeoutController = new AbortController(); - const vectorSyncTimeout = setTimeout( - () => vectorSyncTimeoutController.abort( - new DOMException( - `向量同步超时 (${Math.round(EXTRACTION_VECTOR_SYNC_TIMEOUT_MS / 1000)}s)`, - "AbortError", - ), - ), - EXTRACTION_VECTOR_SYNC_TIMEOUT_MS, - ); - let vectorSyncSignal = vectorSyncTimeoutController.signal; - if (signal) { - try { - if (typeof AbortSignal.any === "function") { - vectorSyncSignal = AbortSignal.any([signal, vectorSyncTimeoutController.signal]); - } - } catch {} - } - try { - vectorSync = await syncVectorState({ signal: vectorSyncSignal }); - } finally { - clearTimeout(vectorSyncTimeout); - } - } catch (error) { - if (isAbortError(error)) { - const isVectorSyncTimeout = error?.name === "AbortError" && - typeof error?.message === "string" && - error.message.includes("向量同步超时"); - if (!isVectorSyncTimeout) throw error; - } - const message = error?.message || String(error) || "向量同步阶段失败"; - setBatchStageOutcome( - status, - "finalize", - "failed", - `向量同步失败: ${message}`, - ); - return { - postProcessArtifacts, - vectorHashesInserted: [], - vectorStats: getVectorIndexStats(currentGraph), - vectorError: message, - warnings: status.warnings, - batchStatus: finalizeBatchStatus(status, extractionCount), + let backgroundVectorSync = null; + const vectorSyncRangeSource = Array.isArray(result?.processedRange) + ? result.processedRange + : Array.isArray(status?.processedRange) + ? status.processedRange + : [endIdx, endIdx]; + const vectorSyncRange = { + start: Math.min( + Number(vectorSyncRangeSource[0] ?? endIdx), + Number(vectorSyncRangeSource[1] ?? endIdx), + ), + end: Math.max( + Number(vectorSyncRangeSource[0] ?? endIdx), + Number(vectorSyncRangeSource[1] ?? endIdx), + ), + }; + if (shouldDeferExtractionVectorSync(settings)) { + const concurrency = resolveMaintenancePostProcessConcurrency(settings); + ensureCurrentGraphRuntimeState(); + currentGraph.vectorIndexState ||= {}; + currentGraph.vectorIndexState.dirty = true; + currentGraph.vectorIndexState.dirtyReason = "background-vector-sync-queued"; + currentGraph.vectorIndexState.lastWarning = + `${concurrency.mode} 模式已将本批向量同步放入后台队列`; + backgroundVectorSync = { + enabled: true, + mode: concurrency.mode, + id: `vector-sync:${Date.now()}:${endIdx}`, + reason: "background-vector-sync-after-extraction", + range: vectorSyncRange, }; - } - - if (vectorSync?.aborted) { - throw createAbortError(vectorSync.error || "提取已终止"); - } - if (vectorSync?.error) { - setBatchStageOutcome( - status, - "finalize", - "failed", - `向量同步失败: ${vectorSync.error}`, + status.backgroundVectorSyncQueued = true; + status.backgroundVectorSyncMode = concurrency.mode; + status.backgroundVectorSyncTaskId = backgroundVectorSync.id; + updateExtractionPostProcessStatus( + "向量同步已排队", + `${concurrency.mode} 模式:本批向量将在持久化后后台同步`, ); - } else { + if (typeof setLastVectorStatus === "function") { + setLastVectorStatus( + "后台向量已排队", + `${concurrency.mode} 模式 · 等待批次持久化确认`, + "running", + { syncRuntime: false }, + ); + } + pushBatchStageArtifact(status, "finalize", "vector-sync-queued"); setBatchStageOutcome(status, "finalize", "success"); + } else { + try { + updateExtractionPostProcessStatus( + "向量同步中", + "正在同步本批提取后的向量索引", + ); + const vectorSyncTimeoutController = new AbortController(); + const vectorSyncTimeout = setTimeout( + () => vectorSyncTimeoutController.abort( + new DOMException( + `向量同步超时 (${Math.round(EXTRACTION_VECTOR_SYNC_TIMEOUT_MS / 1000)}s)`, + "AbortError", + ), + ), + EXTRACTION_VECTOR_SYNC_TIMEOUT_MS, + ); + let vectorSyncSignal = vectorSyncTimeoutController.signal; + if (signal) { + try { + if (typeof AbortSignal.any === "function") { + vectorSyncSignal = AbortSignal.any([signal, vectorSyncTimeoutController.signal]); + } + } catch {} + } + try { + vectorSync = await syncVectorState({ signal: vectorSyncSignal }); + } finally { + clearTimeout(vectorSyncTimeout); + } + } catch (error) { + if (isAbortError(error)) { + const isVectorSyncTimeout = error?.name === "AbortError" && + typeof error?.message === "string" && + error.message.includes("向量同步超时"); + if (!isVectorSyncTimeout) throw error; + } + const message = error?.message || String(error) || "向量同步阶段失败"; + setBatchStageOutcome( + status, + "finalize", + "failed", + `向量同步失败: ${message}`, + ); + return { + postProcessArtifacts, + vectorHashesInserted: [], + vectorStats: getVectorIndexStats(currentGraph), + vectorError: message, + warnings: status.warnings, + batchStatus: finalizeBatchStatus(status, extractionCount), + backgroundVectorSync: null, + }; + } + + if (vectorSync?.aborted) { + throw createAbortError(vectorSync.error || "提取已终止"); + } + if (vectorSync?.error) { + setBatchStageOutcome( + status, + "finalize", + "failed", + `向量同步失败: ${vectorSync.error}`, + ); + } else { + setBatchStageOutcome(status, "finalize", "success"); + } } status.maintenanceJournalSize = @@ -20064,6 +20284,7 @@ async function handleExtractionSuccess( vectorError: vectorSync?.error || "", warnings: status.warnings, batchStatus: finalizeBatchStatus(status, extractionCount), + backgroundVectorSync, }; } @@ -20375,6 +20596,7 @@ async function executeExtractionBatch({ getSchema, handleExtractionSuccess, persistExtractionBatchResult, + scheduleBackgroundVectorSync, setBatchStageOutcome, setLastExtractionStatus, shouldAdvanceProcessedHistory, diff --git a/maintenance/extraction-controller.js b/maintenance/extraction-controller.js index 5889afd..458bdbf 100644 --- a/maintenance/extraction-controller.js +++ b/maintenance/extraction-controller.js @@ -909,6 +909,31 @@ export async function executeExtractionBatchController( const persistence = normalizePersistenceStateRecord(persistResult); batchStatusRef.persistence = persistence; batchStatusRef.historyAdvanceAllowed = persistence.accepted === true; + let backgroundVectorSyncQueue = null; + if ( + persistence.accepted === true && + effects?.backgroundVectorSync?.enabled === true && + typeof runtime.scheduleBackgroundVectorSync === "function" + ) { + backgroundVectorSyncQueue = runtime.scheduleBackgroundVectorSync( + effects.backgroundVectorSync, + settings, + ); + batchStatusRef.backgroundVectorSyncState = + backgroundVectorSyncQueue?.queued === true ? "queued" : "queue-failed"; + batchStatusRef.backgroundVectorSyncQueue = + cloneSerializable(backgroundVectorSyncQueue, backgroundVectorSyncQueue); + if (backgroundVectorSyncQueue?.queued !== true) { + runtime.setBatchStageOutcome( + batchStatusRef, + "finalize", + "partial", + `后台向量同步入队失败: ${backgroundVectorSyncQueue?.reason || "queue-failed"}`, + ); + } + } else if (effects?.backgroundVectorSync?.enabled === true) { + batchStatusRef.backgroundVectorSyncState = "blocked-by-persistence"; + } const finalizedBatchStatus = runtime.finalizeBatchStatus( batchStatusRef, runtime.getExtractionCount(), @@ -960,6 +985,7 @@ export async function executeExtractionBatchController( effects: { ...(effects || {}), persistResult, + backgroundVectorSyncQueue, }, batchStatus: finalizedBatchStatus, persistResult, diff --git a/runtime/concurrency.js b/runtime/concurrency.js index 98b7a34..f83be2e 100644 --- a/runtime/concurrency.js +++ b/runtime/concurrency.js @@ -132,3 +132,190 @@ export async function runLimited( await Promise.all(workers); return preserveOrder ? results : results.filter((item) => item !== undefined); } + +function delay(ms) { + const safeMs = Math.max(0, Math.floor(Number(ms) || 0)); + if (safeMs <= 0) return Promise.resolve(); + return new Promise((resolve) => setTimeout(resolve, safeMs)); +} + +function buildQueueSnapshot(queueState) { + return { + state: queueState.active + ? "running" + : queueState.queue.length > 0 + ? "queued" + : "idle", + queued: queueState.queue.length, + activeId: queueState.active?.id || "", + activeName: queueState.active?.name || "", + completed: queueState.completed, + failed: queueState.failed, + dropped: queueState.dropped, + lastTask: queueState.lastTask ? { ...queueState.lastTask } : null, + updatedAt: queueState.updatedAt, + }; +} + +export function createBackgroundMaintenanceQueue(options = {}) { + const queueState = { + queue: [], + active: null, + completed: 0, + failed: 0, + dropped: 0, + lastTask: null, + updatedAt: Date.now(), + nextId: 1, + draining: false, + maxItems: clampInt(options.maxItems, 24, 1, 256), + maxRetries: clampInt(options.maxRetries, 2, 0, 10), + retryBaseMs: clampInt(options.retryBaseMs, 800, 50, 60000), + onStatus: typeof options.onStatus === "function" ? options.onStatus : null, + }; + + function emitStatus() { + queueState.updatedAt = Date.now(); + const snapshot = buildQueueSnapshot(queueState); + try { + queueState.onStatus?.(snapshot); + } catch { + } + return snapshot; + } + + async function runTask(item) { + queueState.active = item; + item.status = "running"; + item.startedAt = Date.now(); + emitStatus(); + while (item.attempts <= item.maxRetries) { + try { + item.attempts += 1; + const result = await item.run({ + attempt: item.attempts, + maxRetries: item.maxRetries, + id: item.id, + name: item.name, + }); + item.status = "success"; + item.finishedAt = Date.now(); + item.result = result; + queueState.completed += 1; + queueState.lastTask = { + id: item.id, + name: item.name, + status: item.status, + attempts: item.attempts, + error: "", + finishedAt: item.finishedAt, + }; + return result; + } catch (error) { + if (error?.name === "AbortError") { + item.status = "aborted"; + item.error = error?.message || String(error); + break; + } + item.error = error?.message || String(error) || "background-task-failed"; + if (item.attempts > item.maxRetries) { + item.status = "failed"; + break; + } + item.status = "retrying"; + emitStatus(); + await delay(item.retryBaseMs * Math.max(1, item.attempts)); + item.status = "running"; + emitStatus(); + } + } + item.finishedAt = Date.now(); + queueState.failed += 1; + queueState.lastTask = { + id: item.id, + name: item.name, + status: item.status, + attempts: item.attempts, + error: item.error || "", + finishedAt: item.finishedAt, + }; + return null; + } + + async function drain() { + if (queueState.draining) return; + queueState.draining = true; + try { + while (queueState.queue.length > 0) { + const item = queueState.queue.shift(); + await runTask(item); + queueState.active = null; + emitStatus(); + } + } finally { + queueState.active = null; + queueState.draining = false; + emitStatus(); + } + } + + return { + configure(nextOptions = {}) { + queueState.maxItems = clampInt(nextOptions.maxItems, queueState.maxItems, 1, 256); + queueState.maxRetries = clampInt(nextOptions.maxRetries, queueState.maxRetries, 0, 10); + queueState.retryBaseMs = clampInt(nextOptions.retryBaseMs, queueState.retryBaseMs, 50, 60000); + if (typeof nextOptions.onStatus === "function") { + queueState.onStatus = nextOptions.onStatus; + } + return emitStatus(); + }, + enqueue(name, run, taskOptions = {}) { + if (typeof run !== "function") { + throw new TypeError("background maintenance task must be a function"); + } + const occupiedSlots = queueState.queue.length + (queueState.active ? 1 : 0); + if (occupiedSlots >= queueState.maxItems) { + queueState.dropped += 1; + const dropped = { + id: "", + name: String(name || "background-task"), + status: "dropped", + attempts: 0, + error: "background-maintenance-queue-full", + finishedAt: Date.now(), + }; + queueState.lastTask = dropped; + emitStatus(); + return { + queued: false, + reason: "background-maintenance-queue-full", + snapshot: buildQueueSnapshot(queueState), + }; + } + const item = { + id: String(taskOptions.id || `bg-${Date.now()}-${queueState.nextId++}`), + name: String(name || taskOptions.name || "background-task"), + run, + attempts: 0, + maxRetries: clampInt(taskOptions.maxRetries, queueState.maxRetries, 0, 10), + retryBaseMs: clampInt(taskOptions.retryBaseMs, queueState.retryBaseMs, 50, 60000), + status: "queued", + createdAt: Date.now(), + }; + queueState.queue.push(item); + const snapshot = emitStatus(); + void drain(); + return { + queued: true, + id: item.id, + snapshot, + }; + }, + getSnapshot() { + return buildQueueSnapshot(queueState); + }, + get size() { + return queueState.queue.length + (queueState.active ? 1 : 0); + }, + }; +} diff --git a/tests/p0-regressions.mjs b/tests/p0-regressions.mjs index dc5cd65..a34b69d 100644 --- a/tests/p0-regressions.mjs +++ b/tests/p0-regressions.mjs @@ -22,6 +22,7 @@ import { registerCoreEventHooksController, } from "../host/event-binding.js"; import { + executeExtractionBatchController, onRerollController, resolveAutoExtractionPlanController, runExtractionController, @@ -3524,6 +3525,165 @@ async function testExtractionPostProcessStatusesExposeMaintenancePhases() { assert.ok(statusTexts.includes("向量同步中")); } +async function testBalancedModeDefersExtractionVectorSync() { + const harness = await createBatchStageHarness(); + const { createBatchStatusSkeleton, handleExtractionSuccess } = harness.result; + let syncCalls = 0; + harness.currentGraph = { + historyState: { extractionCount: 0 }, + vectorIndexState: {}, + }; + harness.ensureCurrentGraphRuntimeState = () => { + harness.currentGraph.historyState ||= {}; + harness.currentGraph.vectorIndexState ||= {}; + }; + harness.syncVectorState = async () => { + syncCalls += 1; + return { + insertedHashes: ["should-not-run"], + stats: { pending: 0 }, + }; + }; + + const batchStatus = createBatchStatusSkeleton({ + processedRange: [12, 13], + extractionCountBefore: 0, + }); + const effects = await handleExtractionSuccess( + { + newNodeIds: ["node-bg"], + processedRange: [12, 13], + }, + 13, + { + maintenanceExecutionMode: "balanced", + enableConsolidation: false, + enableSynopsis: false, + enableReflection: false, + enableSleepCycle: false, + compressionEveryN: 0, + synopsisEveryN: 1, + reflectEveryN: 1, + sleepEveryN: 1, + }, + undefined, + batchStatus, + ); + + assert.equal(syncCalls, 0); + assert.equal(effects.backgroundVectorSync?.enabled, true); + assert.equal(effects.backgroundVectorSync?.mode, "balanced"); + assert.equal(effects.backgroundVectorSync?.range?.start, 12); + assert.equal(effects.backgroundVectorSync?.range?.end, 13); + assert.equal(harness.currentGraph.vectorIndexState.dirty, true); + assert.equal(effects.batchStatus.stages.finalize.outcome, "success"); + assert.ok( + effects.batchStatus.stages.finalize.artifacts.includes("vector-sync-queued"), + ); + assert.equal(effects.batchStatus.backgroundVectorSyncQueued, true); +} + +async function testBackgroundVectorSyncScheduledAfterAcceptedPersistence() { + const graph = { + nodes: [], + edges: [], + historyState: { extractionCount: 0 }, + vectorIndexState: {}, + }; + let scheduledTask = null; + let scheduleCalls = 0; + const runtime = { + appendBatchJournal(targetGraph, entry) { + targetGraph.batchJournal = [...(targetGraph.batchJournal || []), entry]; + }, + applyProcessedHistorySnapshotToGraph(targetGraph, _chat, endFloor) { + targetGraph.historyState ||= {}; + targetGraph.historyState.lastProcessedAssistantFloor = endFloor; + targetGraph.lastProcessedSeq = endFloor; + }, + buildPersistDelta: () => null, + buildExtractionMessages: () => [], + cloneGraphSnapshot: (value) => JSON.parse(JSON.stringify(value)), + computePostProcessArtifacts: (_before, _after, artifacts = []) => artifacts, + console, + createBatchJournalEntry: (_before, _after, options) => ({ + type: "batch", + ...options, + }), + createBatchStatusSkeleton, + ensureCurrentGraphRuntimeState() { + graph.historyState ||= {}; + graph.vectorIndexState ||= {}; + }, + extractMemories: async () => ({ + success: true, + newNodeIds: ["node-bg"], + processedRange: [1, 1], + }), + finalizeBatchStatus, + getCurrentGraph: () => graph, + getEmbeddingConfig: () => null, + getExtractionCount: () => 1, + getLastProcessedAssistantFloor: () => -1, + getSettings: () => ({}), + getSchema: () => schema, + handleExtractionSuccess: async (_result, _endIdx, _settings, _signal, status) => { + setBatchStageOutcome(status, "finalize", "success"); + return { + postProcessArtifacts: [], + vectorHashesInserted: [], + vectorStats: { pending: 1 }, + vectorError: "", + batchStatus: finalizeBatchStatus(status, 1), + backgroundVectorSync: { + enabled: true, + id: "vector-sync:test", + mode: "balanced", + reason: "background-vector-sync-after-extraction", + range: { start: 1, end: 1 }, + }, + }; + }, + persistExtractionBatchResult: async () => ({ + accepted: true, + revision: 1, + storageTier: "metadata-full", + saveMode: "full", + outcome: "accepted", + }), + scheduleBackgroundVectorSync(task) { + scheduleCalls += 1; + scheduledTask = task; + return { + queued: true, + id: task.id, + snapshot: { state: "queued", queued: 1 }, + }; + }, + setBatchStageOutcome, + setLastExtractionStatus: () => {}, + shouldAdvanceProcessedHistory: () => true, + throwIfAborted: () => {}, + updateProcessedHistorySnapshot(chat, endFloor) { + graph.historyState.processedChatLength = chat.length; + graph.historyState.lastProcessedAssistantFloor = endFloor; + }, + }; + + const result = await executeExtractionBatchController(runtime, { + chat: [{ is_user: true }, { is_user: false }], + startIdx: 1, + endIdx: 1, + settings: { maintenanceExecutionMode: "balanced" }, + }); + + assert.equal(result.historyAdvanceAllowed, true); + assert.equal(scheduleCalls, 1); + assert.deepEqual(scheduledTask?.range, { start: 1, end: 1 }); + assert.equal(result.batchStatus.backgroundVectorSyncState, "queued"); + assert.equal(result.effects.backgroundVectorSyncQueue?.queued, true); +} + async function testAutoConsolidationRunsOnHighDuplicateRiskSingleNode() { const harness = await createBatchStageHarness(); const { createBatchStatusSkeleton, handleExtractionSuccess } = harness.result; @@ -7729,6 +7889,8 @@ await testReverseJournalRecoveryPlanMixedLegacyAndCurrentRetainsRepairSet(); await testBatchStatusStructuralPartialRemainsRecoverable(); await testBatchStatusSemanticFailureDoesNotHideCoreSuccess(); await testExtractionPostProcessStatusesExposeMaintenancePhases(); +await testBalancedModeDefersExtractionVectorSync(); +await testBackgroundVectorSyncScheduledAfterAcceptedPersistence(); await testAutoConsolidationRunsOnHighDuplicateRiskSingleNode(); await testAutoConsolidationSkipsLowRiskSingleNode(); await testAutoConsolidationSuppressedForBulkExtractionBatch(); diff --git a/tests/runtime-concurrency.mjs b/tests/runtime-concurrency.mjs index eb02746..5374514 100644 --- a/tests/runtime-concurrency.mjs +++ b/tests/runtime-concurrency.mjs @@ -1,6 +1,7 @@ import assert from "node:assert/strict"; import { + createBackgroundMaintenanceQueue, getMaintenanceExecutionModeLevel, normalizeMaintenanceExecutionMode, resolveConcurrencyConfig, @@ -87,4 +88,44 @@ assert.equal( ); } +{ + const statuses = []; + const queue = createBackgroundMaintenanceQueue({ + maxItems: 1, + maxRetries: 1, + retryBaseMs: 1, + onStatus: (snapshot) => statuses.push(snapshot), + }); + let attempts = 0; + const enqueued = queue.enqueue("retry-once", async () => { + attempts += 1; + if (attempts === 1) throw new Error("transient"); + return "ok"; + }); + + assert.equal(enqueued.queued, true); + assert.equal(enqueued.snapshot.state, "queued"); + await new Promise((resolve) => setTimeout(resolve, 120)); + const finalSnapshot = queue.getSnapshot(); + assert.equal(attempts, 2); + assert.equal(finalSnapshot.completed, 1); + assert.equal(finalSnapshot.failed, 0); + assert.equal(finalSnapshot.state, "idle"); + assert.equal(finalSnapshot.lastTask?.status, "success"); + assert.ok(statuses.some((snapshot) => snapshot.state === "running")); +} + +{ + const queue = createBackgroundMaintenanceQueue({ maxItems: 1 }); + const first = queue.enqueue("slow", async () => { + await new Promise((resolve) => setTimeout(resolve, 20)); + }); + const second = queue.enqueue("overflow", async () => {}); + + assert.equal(first.queued, true); + assert.equal(second.queued, false); + assert.equal(second.reason, "background-maintenance-queue-full"); + assert.equal(queue.getSnapshot().dropped, 1); +} + console.log("runtime-concurrency tests passed"); diff --git a/ui/ui-status.js b/ui/ui-status.js index e8d2a13..104c68f 100644 --- a/ui/ui-status.js +++ b/ui/ui-status.js @@ -206,6 +206,17 @@ export function createGraphPersistenceState() { authorityDiagnosticsLastPruneError: "", localStoreFormatVersion: 1, localStoreMigrationState: "idle", + backgroundMaintenance: { + state: "idle", + queued: 0, + activeId: "", + activeName: "", + completed: 0, + failed: 0, + dropped: 0, + lastTask: null, + updatedAt: 0, + }, opfsWriteLockState: { active: false, queueDepth: 0,