From 4a1560318a7628999aab59e25075f07c4820d9e7 Mon Sep 17 00:00:00 2001 From: youzini Date: Sat, 30 May 2026 14:06:21 +0000 Subject: [PATCH] refactor(persistence): route state updates through events --- index.js | 12 ++++---- sync/persistence-reducer.js | 45 ++++++++++++++++++++++++++++-- tests/graph-persistence.mjs | 4 +++ tests/persistence-reducer.mjs | 52 +++++++++++++++++++++++++++++++++++ 4 files changed, 105 insertions(+), 8 deletions(-) diff --git a/index.js b/index.js index 7e4e0dc..f7df8c8 100644 --- a/index.js +++ b/index.js @@ -66,11 +66,11 @@ import { repairLegacyLastBatchPersistenceStatus, } from "./sync/legacy-persistence-repair.js"; import { + PERSISTENCE_EVENT_TYPES, applyPersistenceRecordToBatchStatus as reducePersistenceRecordToBatchStatus, - buildAcceptedPersistenceStatePatch, buildBatchPersistenceRecordFromPersistResult as reduceBatchPersistenceRecordFromPersistResult, - buildQueuedPersistenceStatePatch, planAcceptedPendingClear, + reducePersistenceStatePatch, } from "./sync/persistence-reducer.js"; import { buildExtractionMessages, @@ -15105,8 +15105,8 @@ function applyAcceptedPendingPersistState( if (persistenceRecord.accepted === true) { updateGraphPersistenceState( - buildAcceptedPersistenceStatePatch({ - currentState: graphPersistenceState, + reducePersistenceStatePatch(graphPersistenceState, { + type: PERSISTENCE_EVENT_TYPES.ACCEPTED, persistenceRecord, clearQueued: false, }), @@ -15368,8 +15368,8 @@ function queueGraphPersist( } updateGraphPersistenceState( - buildQueuedPersistenceStatePatch({ - currentState: graphPersistenceState, + reducePersistenceStatePatch(graphPersistenceState, { + type: PERSISTENCE_EVENT_TYPES.QUEUED, reason, revision: normalizedRevision, chatId: queuedChatId, diff --git a/sync/persistence-reducer.js b/sync/persistence-reducer.js index 6e4a163..bcffb65 100644 --- a/sync/persistence-reducer.js +++ b/sync/persistence-reducer.js @@ -1,7 +1,8 @@ // ST-BME persistence reducer core. // -// Pure helpers only: no IO, no graph mutation, no UI side effects. Phase 2 -// centralizes accepted/queued/pending invariants while leaving durable routing +// Pure helpers only: no IO, no graph mutation, no UI side effects. +// Phase 2 centralized accepted/queued/pending invariants; Phase 5 routes +// call sites through explicit events while leaving durable routing // (IndexedDB/OPFS/Authority/Luker) in the existing orchestration layer. import { @@ -17,6 +18,11 @@ const SAVED_BATCH_ACCEPTED_TIERS = new Set([ "luker-chat-state", ]); +export const PERSISTENCE_EVENT_TYPES = Object.freeze({ + ACCEPTED: "accepted", + QUEUED: "queued", +}); + function normalizeRevision(value = 0) { const numeric = Number(value || 0); if (!Number.isFinite(numeric) || numeric <= 0) return 0; @@ -142,3 +148,38 @@ export function buildQueuedPersistenceStatePatch({ export function planAcceptedPendingClear(options = {}) { return planAcceptedPendingPersistenceRepair(options); } + +export function reducePersistenceStatePatch(currentState = null, event = null) { + const type = String(event?.type || "").trim(); + switch (type) { + case PERSISTENCE_EVENT_TYPES.ACCEPTED: + return buildAcceptedPersistenceStatePatch({ + currentState, + persistenceRecord: event.persistenceRecord, + acceptedRevision: event.acceptedRevision, + acceptedStorageTier: event.acceptedStorageTier, + acceptedBy: event.acceptedBy, + clearQueued: event.clearQueued !== false, + }); + + case PERSISTENCE_EVENT_TYPES.QUEUED: + return buildQueuedPersistenceStatePatch({ + currentState, + reason: event.reason, + revision: event.revision, + chatId: event.chatId, + immediate: event.immediate === true, + recoverableTier: event.recoverableTier, + }); + + default: + return {}; + } +} + +export function reducePersistenceState(currentState = null, event = null) { + return { + ...(currentState && typeof currentState === "object" ? currentState : {}), + ...reducePersistenceStatePatch(currentState, event), + }; +} diff --git a/tests/graph-persistence.mjs b/tests/graph-persistence.mjs index 090e120..8566ce1 100644 --- a/tests/graph-persistence.mjs +++ b/tests/graph-persistence.mjs @@ -137,11 +137,13 @@ import { repairLegacyLastBatchPersistenceStatus, } from "../sync/legacy-persistence-repair.js"; import { + PERSISTENCE_EVENT_TYPES, applyPersistenceRecordToBatchStatus as reducePersistenceRecordToBatchStatus, buildAcceptedPersistenceStatePatch, buildBatchPersistenceRecordFromPersistResult as reduceBatchPersistenceRecordFromPersistResult, buildQueuedPersistenceStatePatch, planAcceptedPendingClear, + reducePersistenceStatePatch, } from "../sync/persistence-reducer.js"; import { clampFloat, @@ -730,11 +732,13 @@ async function createGraphPersistenceHarness({ isRecoveryOnlyLegacyPersistenceTier, planAcceptedPendingPersistenceRepair, repairLegacyLastBatchPersistenceStatus, + PERSISTENCE_EVENT_TYPES, reducePersistenceRecordToBatchStatus, buildAcceptedPersistenceStatePatch, reduceBatchPersistenceRecordFromPersistResult, buildQueuedPersistenceStatePatch, planAcceptedPendingClear, + reducePersistenceStatePatch, migrateLegacyTaskProfiles(settings = {}) { return { taskProfilesVersion: Number(settings?.taskProfilesVersion || 0), diff --git a/tests/persistence-reducer.mjs b/tests/persistence-reducer.mjs index 904eeb1..55cf3ab 100644 --- a/tests/persistence-reducer.mjs +++ b/tests/persistence-reducer.mjs @@ -2,11 +2,14 @@ import assert from "node:assert/strict"; import { + PERSISTENCE_EVENT_TYPES, applyPersistenceRecordToBatchStatus, buildAcceptedPersistenceStatePatch, buildBatchPersistenceRecordFromPersistResult, buildQueuedPersistenceStatePatch, planAcceptedPendingClear, + reducePersistenceState, + reducePersistenceStatePatch, } from "../sync/persistence-reducer.js"; const acceptedRecord = buildBatchPersistenceRecordFromPersistResult({ @@ -88,6 +91,38 @@ assert.deepEqual( console.log(" ✓ canonical accepted state clears pending and queued fields"); +assert.deepEqual( + reducePersistenceStatePatch( + { + lastAcceptedRevision: 9, + pendingPersist: true, + writesBlocked: true, + }, + { + type: PERSISTENCE_EVENT_TYPES.ACCEPTED, + persistenceRecord: acceptedRecord, + }, + ), + buildAcceptedPersistenceStatePatch({ + currentState: { + lastAcceptedRevision: 9, + pendingPersist: true, + writesBlocked: true, + }, + persistenceRecord: acceptedRecord, + }), +); + +const reducedAcceptedState = reducePersistenceState( + { pendingPersist: true, writesBlocked: true, lastAcceptedRevision: 9, custom: "keep" }, + { type: PERSISTENCE_EVENT_TYPES.ACCEPTED, persistenceRecord: acceptedRecord }, +); +assert.equal(reducedAcceptedState.pendingPersist, false); +assert.equal(reducedAcceptedState.custom, "keep"); +assert.equal(reducedAcceptedState.acceptedStorageTier, "authority-sql"); + +console.log(" ✓ accepted persistence state updates are event-reduced patches"); + const queuedPatch = buildQueuedPersistenceStatePatch({ currentState: { queuedPersistRevision: 6, @@ -119,6 +154,23 @@ assert.equal(blockedQueuedPatch.lastRecoverableStorageTier, "shadow"); console.log(" ✓ queued state preserves max revision and recovery-only semantics"); +assert.deepEqual( + reducePersistenceStatePatch( + { queuedPersistRevision: 6, lastRecoverableStorageTier: "metadata-full" }, + { + type: PERSISTENCE_EVENT_TYPES.QUEUED, + reason: "extraction-batch-complete:pending", + revision: 10, + chatId: "chat-a", + immediate: true, + recoverableTier: "shadow", + }, + ), + queuedPatch, +); + +assert.deepEqual(reducePersistenceStatePatch({}, { type: "unknown" }), {}); + const batchStatus = { completed: true, historyAdvanceAllowed: false,