refactor(persistence): route state updates through events

This commit is contained in:
youzini
2026-05-30 14:06:21 +00:00
parent 29a79a854e
commit 4a1560318a
4 changed files with 105 additions and 8 deletions

View File

@@ -66,11 +66,11 @@ import {
repairLegacyLastBatchPersistenceStatus,
} from "./sync/legacy-persistence-repair.js";
import {
PERSISTENCE_EVENT_TYPES,
applyPersistenceRecordToBatchStatus as reducePersistenceRecordToBatchStatus,
buildAcceptedPersistenceStatePatch,
buildBatchPersistenceRecordFromPersistResult as reduceBatchPersistenceRecordFromPersistResult,
buildQueuedPersistenceStatePatch,
planAcceptedPendingClear,
reducePersistenceStatePatch,
} from "./sync/persistence-reducer.js";
import {
buildExtractionMessages,
@@ -15105,8 +15105,8 @@ function applyAcceptedPendingPersistState(
if (persistenceRecord.accepted === true) {
updateGraphPersistenceState(
buildAcceptedPersistenceStatePatch({
currentState: graphPersistenceState,
reducePersistenceStatePatch(graphPersistenceState, {
type: PERSISTENCE_EVENT_TYPES.ACCEPTED,
persistenceRecord,
clearQueued: false,
}),
@@ -15368,8 +15368,8 @@ function queueGraphPersist(
}
updateGraphPersistenceState(
buildQueuedPersistenceStatePatch({
currentState: graphPersistenceState,
reducePersistenceStatePatch(graphPersistenceState, {
type: PERSISTENCE_EVENT_TYPES.QUEUED,
reason,
revision: normalizedRevision,
chatId: queuedChatId,

View File

@@ -1,7 +1,8 @@
// ST-BME persistence reducer core.
//
// Pure helpers only: no IO, no graph mutation, no UI side effects. Phase 2
// centralizes accepted/queued/pending invariants while leaving durable routing
// Pure helpers only: no IO, no graph mutation, no UI side effects.
// Phase 2 centralized accepted/queued/pending invariants; Phase 5 routes
// call sites through explicit events while leaving durable routing
// (IndexedDB/OPFS/Authority/Luker) in the existing orchestration layer.
import {
@@ -17,6 +18,11 @@ const SAVED_BATCH_ACCEPTED_TIERS = new Set([
"luker-chat-state",
]);
export const PERSISTENCE_EVENT_TYPES = Object.freeze({
ACCEPTED: "accepted",
QUEUED: "queued",
});
function normalizeRevision(value = 0) {
const numeric = Number(value || 0);
if (!Number.isFinite(numeric) || numeric <= 0) return 0;
@@ -142,3 +148,38 @@ export function buildQueuedPersistenceStatePatch({
export function planAcceptedPendingClear(options = {}) {
return planAcceptedPendingPersistenceRepair(options);
}
export function reducePersistenceStatePatch(currentState = null, event = null) {
const type = String(event?.type || "").trim();
switch (type) {
case PERSISTENCE_EVENT_TYPES.ACCEPTED:
return buildAcceptedPersistenceStatePatch({
currentState,
persistenceRecord: event.persistenceRecord,
acceptedRevision: event.acceptedRevision,
acceptedStorageTier: event.acceptedStorageTier,
acceptedBy: event.acceptedBy,
clearQueued: event.clearQueued !== false,
});
case PERSISTENCE_EVENT_TYPES.QUEUED:
return buildQueuedPersistenceStatePatch({
currentState,
reason: event.reason,
revision: event.revision,
chatId: event.chatId,
immediate: event.immediate === true,
recoverableTier: event.recoverableTier,
});
default:
return {};
}
}
export function reducePersistenceState(currentState = null, event = null) {
return {
...(currentState && typeof currentState === "object" ? currentState : {}),
...reducePersistenceStatePatch(currentState, event),
};
}

View File

@@ -137,11 +137,13 @@ import {
repairLegacyLastBatchPersistenceStatus,
} from "../sync/legacy-persistence-repair.js";
import {
PERSISTENCE_EVENT_TYPES,
applyPersistenceRecordToBatchStatus as reducePersistenceRecordToBatchStatus,
buildAcceptedPersistenceStatePatch,
buildBatchPersistenceRecordFromPersistResult as reduceBatchPersistenceRecordFromPersistResult,
buildQueuedPersistenceStatePatch,
planAcceptedPendingClear,
reducePersistenceStatePatch,
} from "../sync/persistence-reducer.js";
import {
clampFloat,
@@ -730,11 +732,13 @@ async function createGraphPersistenceHarness({
isRecoveryOnlyLegacyPersistenceTier,
planAcceptedPendingPersistenceRepair,
repairLegacyLastBatchPersistenceStatus,
PERSISTENCE_EVENT_TYPES,
reducePersistenceRecordToBatchStatus,
buildAcceptedPersistenceStatePatch,
reduceBatchPersistenceRecordFromPersistResult,
buildQueuedPersistenceStatePatch,
planAcceptedPendingClear,
reducePersistenceStatePatch,
migrateLegacyTaskProfiles(settings = {}) {
return {
taskProfilesVersion: Number(settings?.taskProfilesVersion || 0),

View File

@@ -2,11 +2,14 @@
import assert from "node:assert/strict";
import {
PERSISTENCE_EVENT_TYPES,
applyPersistenceRecordToBatchStatus,
buildAcceptedPersistenceStatePatch,
buildBatchPersistenceRecordFromPersistResult,
buildQueuedPersistenceStatePatch,
planAcceptedPendingClear,
reducePersistenceState,
reducePersistenceStatePatch,
} from "../sync/persistence-reducer.js";
const acceptedRecord = buildBatchPersistenceRecordFromPersistResult({
@@ -88,6 +91,38 @@ assert.deepEqual(
console.log(" ✓ canonical accepted state clears pending and queued fields");
assert.deepEqual(
reducePersistenceStatePatch(
{
lastAcceptedRevision: 9,
pendingPersist: true,
writesBlocked: true,
},
{
type: PERSISTENCE_EVENT_TYPES.ACCEPTED,
persistenceRecord: acceptedRecord,
},
),
buildAcceptedPersistenceStatePatch({
currentState: {
lastAcceptedRevision: 9,
pendingPersist: true,
writesBlocked: true,
},
persistenceRecord: acceptedRecord,
}),
);
const reducedAcceptedState = reducePersistenceState(
{ pendingPersist: true, writesBlocked: true, lastAcceptedRevision: 9, custom: "keep" },
{ type: PERSISTENCE_EVENT_TYPES.ACCEPTED, persistenceRecord: acceptedRecord },
);
assert.equal(reducedAcceptedState.pendingPersist, false);
assert.equal(reducedAcceptedState.custom, "keep");
assert.equal(reducedAcceptedState.acceptedStorageTier, "authority-sql");
console.log(" ✓ accepted persistence state updates are event-reduced patches");
const queuedPatch = buildQueuedPersistenceStatePatch({
currentState: {
queuedPersistRevision: 6,
@@ -119,6 +154,23 @@ assert.equal(blockedQueuedPatch.lastRecoverableStorageTier, "shadow");
console.log(" ✓ queued state preserves max revision and recovery-only semantics");
assert.deepEqual(
reducePersistenceStatePatch(
{ queuedPersistRevision: 6, lastRecoverableStorageTier: "metadata-full" },
{
type: PERSISTENCE_EVENT_TYPES.QUEUED,
reason: "extraction-batch-complete:pending",
revision: 10,
chatId: "chat-a",
immediate: true,
recoverableTier: "shadow",
},
),
queuedPatch,
);
assert.deepEqual(reducePersistenceStatePatch({}, { type: "unknown" }), {});
const batchStatus = {
completed: true,
historyAdvanceAllowed: false,