Slim manual cloud backups and guard truncated journal rollback

This commit is contained in:
Hao19911125
2026-04-12 10:43:38 +08:00
parent aec756f180
commit 580a049442
4 changed files with 220 additions and 11 deletions

View File

@@ -22,6 +22,8 @@ const BATCH_JOURNAL_LIMIT = 96;
const MAINTENANCE_JOURNAL_LIMIT = 20;
export const BATCH_JOURNAL_VERSION = 2;
export const PROCESSED_MESSAGE_HASH_VERSION = 2;
export const MANUAL_BACKUP_BATCH_JOURNAL_COVERAGE_KEY =
"manualBackupBatchJournalCoverage";
export function buildVectorCollectionId(chatId) {
return `st-bme::${chatId || "unknown-chat"}`;
@@ -51,6 +53,7 @@ export function createDefaultHistoryState(chatId = "") {
activeUserPovOwner: "",
activeRecallOwnerKey: "",
recentRecallOwnerKeys: [],
[MANUAL_BACKUP_BATCH_JOURNAL_COVERAGE_KEY]: null,
};
}
@@ -86,6 +89,63 @@ export function createDefaultMaintenanceJournal() {
return [];
}
function normalizeManualBackupBatchJournalCoverage(value = null) {
if (!value || typeof value !== "object" || Array.isArray(value)) {
return null;
}
const earliestRetainedFloor = Number(value.earliestRetainedFloor);
const retainedCount = Number(value.retainedCount);
return {
truncated: value.truncated === true,
earliestRetainedFloor: Number.isFinite(earliestRetainedFloor)
? Math.max(0, Math.floor(earliestRetainedFloor))
: null,
retainedCount: Number.isFinite(retainedCount)
? Math.max(0, Math.floor(retainedCount))
: 0,
};
}
function getEarliestJournalCoverageStartFloor(journals = []) {
let earliestFloor = null;
for (const journal of Array.isArray(journals) ? journals : []) {
const range = Array.isArray(journal?.processedRange)
? journal.processedRange
: [];
const startFloor = Number(range[0]);
if (!Number.isFinite(startFloor)) continue;
const normalizedFloor = Math.max(0, Math.floor(startFloor));
earliestFloor =
earliestFloor == null
? normalizedFloor
: Math.min(earliestFloor, normalizedFloor);
}
return earliestFloor;
}
function getRequiredJournalCoverageStartFloor(graph, journals = []) {
const actualCoverageFloor = getEarliestJournalCoverageStartFloor(journals);
const manualCoverage = normalizeManualBackupBatchJournalCoverage(
graph?.historyState?.[MANUAL_BACKUP_BATCH_JOURNAL_COVERAGE_KEY],
);
const manualCoverageFloor =
manualCoverage?.truncated === true &&
Number.isFinite(manualCoverage?.earliestRetainedFloor)
? manualCoverage.earliestRetainedFloor
: null;
if (
Number.isFinite(actualCoverageFloor) &&
Number.isFinite(manualCoverageFloor)
) {
return Math.max(actualCoverageFloor, manualCoverageFloor);
}
if (Number.isFinite(actualCoverageFloor)) return actualCoverageFloor;
if (Number.isFinite(manualCoverageFloor)) return manualCoverageFloor;
return null;
}
export function normalizeGraphRuntimeState(graph, chatId = "") {
if (!graph || typeof graph !== "object") {
return graph;
@@ -99,6 +159,10 @@ export function normalizeGraphRuntimeState(graph, chatId = "") {
...createDefaultHistoryState(chatId),
...(graph.historyState || {}),
};
historyState[MANUAL_BACKUP_BATCH_JOURNAL_COVERAGE_KEY] =
normalizeManualBackupBatchJournalCoverage(
historyState[MANUAL_BACKUP_BATCH_JOURNAL_COVERAGE_KEY],
);
const vectorIndexState = {
...createDefaultVectorIndexState(chatId),
...(graph.vectorIndexState || {}),
@@ -1231,6 +1295,17 @@ export function rollbackBatch(graph, journal) {
export function findJournalRecoveryPoint(graph, dirtyFromFloor) {
const journals = Array.isArray(graph?.batchJournal) ? graph.batchJournal : [];
const requiredCoverageFloor = getRequiredJournalCoverageStartFloor(
graph,
journals,
);
if (
Number.isFinite(dirtyFromFloor) &&
Number.isFinite(requiredCoverageFloor) &&
dirtyFromFloor < requiredCoverageFloor
) {
return null;
}
const affectedIndex = journals.findIndex((journal) => {
const range = Array.isArray(journal?.processedRange)
? journal.processedRange

View File

@@ -1,5 +1,8 @@
import { BmeDatabase } from "./bme-db.js";
import { PROCESSED_MESSAGE_HASH_VERSION } from "../runtime/runtime-state.js";
import {
MANUAL_BACKUP_BATCH_JOURNAL_COVERAGE_KEY,
PROCESSED_MESSAGE_HASH_VERSION,
} from "../runtime/runtime-state.js";
const BME_SYNC_FILE_PREFIX = "ST-BME_sync_";
const BME_SYNC_FILE_SUFFIX = ".json";
@@ -30,6 +33,7 @@ const RUNTIME_TIMELINE_STATE_META_KEY = "timelineState";
const RUNTIME_LAST_PROCESSED_SEQ_META_KEY = "runtimeLastProcessedSeq";
const RUNTIME_GRAPH_VERSION_META_KEY = "runtimeGraphVersion";
const RUNTIME_BATCH_JOURNAL_LIMIT = 96;
const MANUAL_BACKUP_BATCH_JOURNAL_LIMIT = 4;
function normalizeChatId(chatId) {
return String(chatId ?? "").trim();
@@ -339,7 +343,7 @@ async function fetchBackupManifest(options = {}) {
async function writeBackupManifest(entries = [], options = {}) {
const fetchImpl = getFetch(options);
const payload = JSON.stringify(entries, null, 2);
const payload = JSON.stringify(entries);
const response = await fetchImpl("/api/files/upload", {
method: "POST",
headers: {
@@ -576,7 +580,7 @@ async function writeBackupEnvelope(envelope, chatId, options = {}) {
const normalizedChatId = normalizeChatId(chatId);
const filename = buildBackupFilename(normalizedChatId);
const fetchImpl = getFetch(options);
const payload = JSON.stringify(envelope, null, 2);
const payload = JSON.stringify(envelope);
const response = await fetchImpl("/api/files/upload", {
method: "POST",
headers: {
@@ -1048,6 +1052,59 @@ function normalizeRuntimeHistoryMeta(value = {}, fallbackChatId = "") {
};
}
function resolveEarliestRetainedBatchFloor(journals = []) {
let earliestFloor = null;
for (const journal of Array.isArray(journals) ? journals : []) {
const range = Array.isArray(journal?.processedRange)
? journal.processedRange
: [];
const startFloor = Number(range[0]);
if (!Number.isFinite(startFloor)) continue;
const normalizedFloor = Math.max(0, Math.floor(startFloor));
earliestFloor =
earliestFloor == null
? normalizedFloor
: Math.min(earliestFloor, normalizedFloor);
}
return earliestFloor;
}
function buildManualBackupSnapshot(snapshot = {}, chatId = "") {
const normalizedSnapshot = normalizeSyncSnapshot(snapshot, chatId);
const meta = toSerializableData(normalizedSnapshot.meta, {});
const originalBatchJournal = Array.isArray(meta[RUNTIME_BATCH_JOURNAL_META_KEY])
? toSerializableData(meta[RUNTIME_BATCH_JOURNAL_META_KEY], [])
: [];
const retainedBatchJournal = originalBatchJournal.slice(
-MANUAL_BACKUP_BATCH_JOURNAL_LIMIT,
);
const historyState = normalizeRuntimeHistoryMeta(
meta[RUNTIME_HISTORY_META_KEY],
chatId,
);
historyState[MANUAL_BACKUP_BATCH_JOURNAL_COVERAGE_KEY] = {
truncated: originalBatchJournal.length > retainedBatchJournal.length,
earliestRetainedFloor: resolveEarliestRetainedBatchFloor(retainedBatchJournal),
retainedCount: retainedBatchJournal.length,
};
meta[RUNTIME_HISTORY_META_KEY] = historyState;
meta[RUNTIME_BATCH_JOURNAL_META_KEY] = retainedBatchJournal;
meta[RUNTIME_MAINTENANCE_JOURNAL_META_KEY] = [];
return {
meta,
nodes: toSerializableData(normalizedSnapshot.nodes, []),
edges: toSerializableData(normalizedSnapshot.edges, []),
tombstones: toSerializableData(normalizedSnapshot.tombstones, []),
state: toSerializableData(normalizedSnapshot.state, {
lastProcessedFloor: -1,
extractionCount: 0,
}),
};
}
function mergeRuntimeHistoryMeta(localMeta = {}, remoteMeta = {}, options = {}) {
const localHistory = normalizeRuntimeHistoryMeta(localMeta, options.chatId);
const remoteHistory = normalizeRuntimeHistoryMeta(remoteMeta, options.chatId);
@@ -1856,19 +1913,14 @@ export async function backupToServer(chatId, options = {}) {
nowMs,
);
const backupSnapshot = buildManualBackupSnapshot(snapshot, normalizedChatId);
const envelope = {
kind: "st-bme-backup",
version: BME_BACKUP_SCHEMA_VERSION,
chatId: normalizedChatId,
createdAt: nowMs,
sourceDeviceId: deviceId,
snapshot: {
meta: toSerializableData(snapshot.meta, {}),
nodes: toSerializableData(snapshot.nodes, []),
edges: toSerializableData(snapshot.edges, []),
tombstones: toSerializableData(snapshot.tombstones, []),
state: toSerializableData(snapshot.state, {}),
},
snapshot: backupSnapshot,
};
const uploadResult = await writeBackupEnvelope(

View File

@@ -22,6 +22,7 @@ import {
syncNow,
upload,
} from "../sync/bme-sync.js";
import { MANUAL_BACKUP_BATCH_JOURNAL_COVERAGE_KEY } from "../runtime/runtime-state.js";
const PREFIX = "[ST-BME][indexeddb-sync]";
@@ -159,6 +160,7 @@ function createMockFetchEnvironment() {
remoteFiles.set(body.name, payload);
logs.uploadedPayloads.push({
name: body.name,
decoded,
payload,
});
return createJsonResponse(200, { path: `/user/files/${body.name}` });
@@ -605,7 +607,7 @@ async function testManualCloudModeGuards() {
}
async function testManualBackupAndRestoreFlow() {
const { fetch, remoteFiles } = createMockFetchEnvironment();
const { fetch, remoteFiles, logs } = createMockFetchEnvironment();
const dbByChatId = new Map();
const db = new FakeDb("chat-backup-flow", {
meta: {
@@ -617,6 +619,23 @@ async function testManualBackupAndRestoreFlow() {
nodeCount: 1,
edgeCount: 0,
tombstoneCount: 0,
runtimeHistoryState: {
chatId: "chat-backup-flow",
lastProcessedAssistantFloor: 4,
extractionCount: 2,
},
runtimeBatchJournal: [
{ id: "journal-1", processedRange: [0, 0], createdAt: 11 },
{ id: "journal-2", processedRange: [1, 1], createdAt: 22 },
{ id: "journal-3", processedRange: [2, 2], createdAt: 33 },
{ id: "journal-4", processedRange: [3, 3], createdAt: 44 },
{ id: "journal-5", processedRange: [4, 4], createdAt: 55 },
{ id: "journal-6", processedRange: [5, 5], createdAt: 66 },
],
maintenanceJournal: [
{ id: "maintenance-a", updatedAt: 70 },
{ id: "maintenance-b", updatedAt: 80 },
],
},
nodes: [{ id: "local-node", updatedAt: 80 }],
edges: [],
@@ -642,10 +661,36 @@ async function testManualBackupAndRestoreFlow() {
assert.equal(db.meta.get("syncDirty"), false);
assert.ok(Number(db.meta.get("lastBackupUploadedAt")) > 0);
assert.ok(String(db.meta.get("lastBackupFilename") || "").startsWith("ST-BME_backup_"));
const backupPayload = remoteFiles.get(backupResult.filename);
assert.ok(backupPayload, "manual backup should be written to remote files");
assert.equal(backupPayload.snapshot.meta.runtimeBatchJournal.length, 4);
assert.deepEqual(
backupPayload.snapshot.meta.runtimeBatchJournal.map((entry) => entry.id),
["journal-3", "journal-4", "journal-5", "journal-6"],
);
assert.equal(backupPayload.snapshot.meta.maintenanceJournal.length, 0);
assert.deepEqual(
backupPayload.snapshot.meta.runtimeHistoryState[MANUAL_BACKUP_BATCH_JOURNAL_COVERAGE_KEY],
{
truncated: true,
earliestRetainedFloor: 2,
retainedCount: 4,
},
);
const backupUploadLog = logs.uploadedPayloads.find(
(entry) => entry.name === backupResult.filename,
);
assert.ok(backupUploadLog);
assert.equal(backupUploadLog.decoded.includes("\n"), false);
const manifestResult = await listServerBackups(runtime);
assert.equal(manifestResult.entries.length, 1);
assert.equal(manifestResult.entries[0].chatId, "chat-backup-flow");
const manifestUploadLog = logs.uploadedPayloads.find(
(entry) => entry.name === "ST-BME_BackupManifest.json",
);
assert.ok(manifestUploadLog);
assert.equal(manifestUploadLog.decoded.includes("\n"), false);
db.snapshot = {
meta: {
@@ -670,6 +715,16 @@ async function testManualBackupAndRestoreFlow() {
const restoreResult = await restoreFromServer("chat-backup-flow", runtime);
assert.equal(restoreResult.restored, true);
assert.equal(db.snapshot.nodes[0].id, "local-node");
assert.equal(db.snapshot.meta.runtimeBatchJournal.length, 4);
assert.equal(db.snapshot.meta.maintenanceJournal.length, 0);
assert.deepEqual(
db.snapshot.meta.runtimeHistoryState[MANUAL_BACKUP_BATCH_JOURNAL_COVERAGE_KEY],
{
truncated: true,
earliestRetainedFloor: 2,
retainedCount: 4,
},
);
assert.ok(Number(db.meta.get("lastBackupRestoredAt")) > 0);
const safetyStatus = await getRestoreSafetySnapshotStatus(
"chat-backup-flow",

View File

@@ -1,5 +1,6 @@
import assert from "node:assert/strict";
import {
MANUAL_BACKUP_BATCH_JOURNAL_COVERAGE_KEY,
appendBatchJournal,
clearHistoryDirty,
cloneGraphSnapshot,
@@ -264,6 +265,32 @@ assert.ok(recoveryPoint);
assert.equal(recoveryPoint.path, "reverse-journal");
assert.equal(recoveryPoint.affectedJournals[0].processedRange[1], 3);
const truncatedCoverageGraph = createEmptyGraph();
truncatedCoverageGraph.historyState.chatId = "chat-truncated-history-test";
truncatedCoverageGraph.historyState[MANUAL_BACKUP_BATCH_JOURNAL_COVERAGE_KEY] = {
truncated: true,
earliestRetainedFloor: 4,
retainedCount: 4,
};
truncatedCoverageGraph.batchJournal = [
{ id: "journal-4", journalVersion: 2, processedRange: [4, 4] },
{ id: "journal-5", journalVersion: 2, processedRange: [5, 5] },
{ id: "journal-6", journalVersion: 2, processedRange: [6, 6] },
{ id: "journal-7", journalVersion: 2, processedRange: [7, 7] },
];
assert.equal(
findJournalRecoveryPoint(truncatedCoverageGraph, 3),
null,
"dirty floor earlier than retained backup coverage should reject partial rollback",
);
const retainedCoverageRecoveryPoint = findJournalRecoveryPoint(
truncatedCoverageGraph,
5,
);
assert.ok(retainedCoverageRecoveryPoint);
assert.equal(retainedCoverageRecoveryPoint.path, "reverse-journal");
assert.equal(retainedCoverageRecoveryPoint.affectedJournals.length, 3);
rollbackBatch(graph, recoveryPoint.affectedJournals[0]);
assert.equal(graph.nodes.length, 0);
assert.equal(graph.historyState.lastProcessedAssistantFloor, -1);