fix: harden persistence tiers and opfs durability

This commit is contained in:
Youzini-afk
2026-04-14 15:43:59 +08:00
parent 246af61f6c
commit 33b8d298f7
11 changed files with 1916 additions and 438 deletions

View File

@@ -267,6 +267,30 @@ function buildSnapshotFilename(prefix, revision = 0, stampMs = Date.now()) {
return `${String(prefix || "snapshot")}.${normalizeRevision(revision)}.${normalizeTimestamp(stampMs)}.json`;
}
function escapeRegex(value = "") {
return String(value || "").replace(/[.*+?^${}()|[\]\\]/g, "\\$&");
}
function parseSnapshotFilenameCandidate(name = "", prefix = "") {
const normalizedName = String(name || "").trim();
const normalizedPrefix = String(prefix || "").trim();
if (!normalizedName || !normalizedPrefix) {
return null;
}
const matcher = new RegExp(
`^${escapeRegex(normalizedPrefix)}\\.(\\d+)\\.(\\d+)\\.json$`,
);
const match = normalizedName.match(matcher);
if (!match) {
return null;
}
return {
filename: normalizedName,
revision: normalizeRevision(match[1]),
stampMs: normalizeTimestamp(match[2], 0),
};
}
function isNotFoundError(error) {
const name = String(error?.name || "");
const message = String(error?.message || "");
@@ -330,6 +354,37 @@ async function deleteFileIfExists(parentHandle, name) {
}
}
async function listDirectoryFileNames(parentHandle) {
if (!parentHandle) return [];
if (parentHandle.files instanceof Map) {
return Array.from(parentHandle.files.keys()).map((name) => String(name || ""));
}
const names = [];
if (typeof parentHandle.keys === "function") {
for await (const key of parentHandle.keys()) {
if (typeof key === "string" && key) {
names.push(key);
}
}
return names;
}
if (typeof parentHandle.entries === "function") {
for await (const [name, handle] of parentHandle.entries()) {
if (
typeof name === "string" &&
name &&
(!handle || handle.kind === "file" || typeof handle.getFile === "function")
) {
names.push(name);
}
}
}
return names;
}
function normalizeSnapshotState(snapshot = {}) {
const meta =
snapshot?.meta && typeof snapshot.meta === "object" && !Array.isArray(snapshot.meta)
@@ -396,19 +451,30 @@ function buildSnapshotFromStoredParts(manifest, corePayload = {}, auxPayload = {
const nodes = sanitizeSnapshotRecordArray(corePayload?.nodes);
const edges = sanitizeSnapshotRecordArray(corePayload?.edges);
const tombstones = sanitizeSnapshotRecordArray(auxPayload?.tombstones);
const mergedMeta = {
...baseMeta,
...coreMeta,
...auxMeta,
};
const state = normalizeSnapshotState({
meta: {
...baseMeta,
...coreMeta,
...auxMeta,
meta: mergedMeta,
state: {
...(corePayload?.state &&
typeof corePayload.state === "object" &&
!Array.isArray(corePayload.state)
? corePayload.state
: {}),
...(Number.isFinite(Number(baseMeta?.lastProcessedFloor))
? { lastProcessedFloor: Number(baseMeta.lastProcessedFloor) }
: {}),
...(Number.isFinite(Number(baseMeta?.extractionCount))
? { extractionCount: Number(baseMeta.extractionCount) }
: {}),
},
state: corePayload?.state,
});
const meta = {
...createDefaultMetaValues(baseMeta.chatId || manifest?.chatId || ""),
...toPlainData(baseMeta, {}),
...toPlainData(coreMeta, {}),
...toPlainData(auxMeta, {}),
...toPlainData(mergedMeta, {}),
chatId: normalizeChatId(baseMeta.chatId || manifest?.chatId || ""),
schemaVersion: BME_DB_SCHEMA_VERSION,
nodeCount: nodes.length,
@@ -524,6 +590,14 @@ export class OpfsGraphStore {
: getDefaultOpfsRootDirectory;
this._chatDirectoryPromise = null;
this._manifestCache = null;
this._writeChain = Promise.resolve();
this._writeQueueDepth = 0;
this._writeLockState = {
active: false,
queueDepth: 0,
lastReason: "",
updatedAt: 0,
};
}
async open() {
@@ -534,11 +608,79 @@ export class OpfsGraphStore {
async close() {
this._chatDirectoryPromise = null;
this._manifestCache = null;
this._writeChain = Promise.resolve();
this._writeQueueDepth = 0;
this._writeLockState = {
active: false,
queueDepth: 0,
lastReason: "",
updatedAt: 0,
};
}
getWriteLockSnapshot() {
return toPlainData(this._writeLockState, this._writeLockState);
}
async _awaitPendingWrites() {
try {
await this._writeChain;
} catch {
// swallow previous write failure for read barrier
}
}
_setWriteLockState(patch = {}) {
this._writeLockState = {
...this._writeLockState,
...(patch || {}),
updatedAt: Date.now(),
};
return this._writeLockState;
}
async _runSerializedWrite(reason = "opfs-write", task = null) {
if (typeof task !== "function") {
throw new Error("OpfsGraphStore serialized write task is required");
}
this._writeQueueDepth += 1;
this._setWriteLockState({
active: true,
queueDepth: this._writeQueueDepth,
lastReason: String(reason || "opfs-write"),
});
const runTask = async () => {
try {
return await task();
} finally {
this._writeQueueDepth = Math.max(0, this._writeQueueDepth - 1);
this._setWriteLockState({
active: this._writeQueueDepth > 0,
queueDepth: this._writeQueueDepth,
lastReason: String(reason || "opfs-write"),
});
}
};
const nextWrite = this._writeChain.catch(() => null).then(runTask);
this._writeChain = nextWrite.catch(() => null);
return await nextWrite;
}
async getMeta(key, fallbackValue = null) {
const normalizedKey = normalizeRecordId(key);
if (!normalizedKey) return fallbackValue;
if (OPFS_MANIFEST_META_KEYS.has(normalizedKey)) {
const manifest = await this._ensureManifest();
const manifestMeta =
manifest?.meta &&
typeof manifest.meta === "object" &&
!Array.isArray(manifest.meta)
? manifest.meta
: {};
return Object.prototype.hasOwnProperty.call(manifestMeta, normalizedKey)
? manifestMeta[normalizedKey]
: fallbackValue;
}
const snapshot = await this._loadSnapshot();
return Object.prototype.hasOwnProperty.call(snapshot.meta, normalizedKey)
? snapshot.meta[normalizedKey]
@@ -548,22 +690,12 @@ export class OpfsGraphStore {
async setMeta(key, value) {
const normalizedKey = normalizeRecordId(key);
if (!normalizedKey) return null;
const snapshot = await this._loadSnapshot();
snapshot.meta[normalizedKey] = toPlainData(value, value);
if (normalizedKey === "lastProcessedFloor") {
snapshot.state.lastProcessedFloor = Number.isFinite(Number(value))
? Number(value)
: META_DEFAULT_LAST_PROCESSED_FLOOR;
}
if (normalizedKey === "extractionCount") {
snapshot.state.extractionCount = Number.isFinite(Number(value))
? Number(value)
: META_DEFAULT_EXTRACTION_COUNT;
}
await this._writeResolvedSnapshot(snapshot);
await this.patchMeta({
[normalizedKey]: value,
});
return {
key: normalizedKey,
value: snapshot.meta[normalizedKey],
value: await this.getMeta(normalizedKey, null),
updatedAt: Date.now(),
};
}
@@ -572,27 +704,68 @@ export class OpfsGraphStore {
if (!record || typeof record !== "object" || Array.isArray(record)) {
return {};
}
const snapshot = await this._loadSnapshot();
const entries = [];
for (const [rawKey, value] of Object.entries(record)) {
const key = normalizeRecordId(rawKey);
if (!key) continue;
const normalizedValue = toPlainData(value, value);
snapshot.meta[key] = normalizedValue;
if (key === "lastProcessedFloor") {
snapshot.state.lastProcessedFloor = Number.isFinite(Number(normalizedValue))
? Number(normalizedValue)
: META_DEFAULT_LAST_PROCESSED_FLOOR;
}
if (key === "extractionCount") {
snapshot.state.extractionCount = Number.isFinite(Number(normalizedValue))
? Number(normalizedValue)
: META_DEFAULT_EXTRACTION_COUNT;
}
entries.push([key, normalizedValue]);
const entries = Object.entries(record)
.map(([rawKey, value]) => [normalizeRecordId(rawKey), toPlainData(value, value)])
.filter(([key]) => Boolean(key));
if (!entries.length) {
return {};
}
await this._writeResolvedSnapshot(snapshot);
return Object.fromEntries(entries);
const allManifestOnly = entries.every(([key]) =>
OPFS_MANIFEST_META_KEYS.has(key),
);
if (allManifestOnly) {
return await this._runSerializedWrite("patchMeta:manifest", async () => {
const manifest = await this._ensureManifest({ awaitWrites: false });
const nextMeta = {
...createDefaultMetaValues(this.chatId),
...(manifest?.meta && typeof manifest.meta === "object" && !Array.isArray(manifest.meta)
? toPlainData(manifest.meta, {})
: {}),
chatId: this.chatId,
storagePrimary: OPFS_STORE_KIND,
storageMode: this.storeMode,
};
for (const [key, normalizedValue] of entries) {
nextMeta[key] = normalizedValue;
}
const nextManifest = {
...(manifest || {}),
version: OPFS_MANIFEST_VERSION,
chatId: this.chatId,
storeKind: OPFS_STORE_KIND,
storeMode: this.storeMode,
activeCoreFilename: String(manifest?.activeCoreFilename || ""),
activeAuxFilename: String(manifest?.activeAuxFilename || ""),
meta: nextMeta,
};
const chatDirectory = await this._getChatDirectory();
await writeJsonFile(chatDirectory, OPFS_MANIFEST_FILENAME, nextManifest);
this._manifestCache = nextManifest;
return Object.fromEntries(entries);
});
}
return await this._runSerializedWrite("patchMeta:snapshot", async () => {
const snapshot = await this._loadSnapshot({ awaitWrites: false });
const appliedEntries = [];
for (const [key, normalizedValue] of entries) {
snapshot.meta[key] = normalizedValue;
if (key === "lastProcessedFloor") {
snapshot.state.lastProcessedFloor = Number.isFinite(Number(normalizedValue))
? Number(normalizedValue)
: META_DEFAULT_LAST_PROCESSED_FLOOR;
}
if (key === "extractionCount") {
snapshot.state.extractionCount = Number.isFinite(Number(normalizedValue))
? Number(normalizedValue)
: META_DEFAULT_EXTRACTION_COUNT;
}
appliedEntries.push([key, normalizedValue]);
}
await this._writeResolvedSnapshot(snapshot);
return Object.fromEntries(appliedEntries);
});
}
async getRevision() {
@@ -608,13 +781,16 @@ export class OpfsGraphStore {
}
async commitDelta(delta = {}, options = {}) {
const nowMs = Date.now();
const normalizedDelta =
delta && typeof delta === "object" && !Array.isArray(delta) ? delta : {};
const currentSnapshot = await this._loadSnapshot();
const nodeMap = new Map();
const edgeMap = new Map();
const tombstoneMap = new Map();
return await this._runSerializedWrite(
String(options?.reason || "commitDelta"),
async () => {
const nowMs = Date.now();
const normalizedDelta =
delta && typeof delta === "object" && !Array.isArray(delta) ? delta : {};
const currentSnapshot = await this._loadSnapshot({ awaitWrites: false });
const nodeMap = new Map();
const edgeMap = new Map();
const tombstoneMap = new Map();
for (const node of sanitizeSnapshotRecordArray(currentSnapshot.nodes)) {
const id = normalizeRecordId(node.id);
@@ -721,31 +897,33 @@ export class OpfsGraphStore {
? Number(runtimeMetaPatch.extractionCount)
: currentSnapshot.state.extractionCount,
};
const nextSnapshot = {
meta: nextMeta,
state: nextState,
nodes: Array.from(nodeMap.values()),
edges: Array.from(edgeMap.values()),
tombstones: Array.from(tombstoneMap.values()),
};
await this._writeResolvedSnapshot(nextSnapshot);
const nextSnapshot = {
meta: nextMeta,
state: nextState,
nodes: Array.from(nodeMap.values()),
edges: Array.from(edgeMap.values()),
tombstones: Array.from(tombstoneMap.values()),
};
await this._writeResolvedSnapshot(nextSnapshot);
return {
revision: nextRevision,
lastModified: nowMs,
imported: {
nodes: nextSnapshot.nodes.length,
edges: nextSnapshot.edges.length,
tombstones: nextSnapshot.tombstones.length,
return {
revision: nextRevision,
lastModified: nowMs,
imported: {
nodes: nextSnapshot.nodes.length,
edges: nextSnapshot.edges.length,
tombstones: nextSnapshot.tombstones.length,
},
delta: {
upsertNodes: upsertNodes.length,
upsertEdges: upsertEdges.length,
deleteNodeIds: deleteNodeIds.length,
deleteEdgeIds: deleteEdgeIds.length,
tombstones: tombstones.length,
},
};
},
delta: {
upsertNodes: upsertNodes.length,
upsertEdges: upsertEdges.length,
deleteNodeIds: deleteNodeIds.length,
deleteEdgeIds: deleteEdgeIds.length,
tombstones: tombstones.length,
},
};
);
}
async bulkUpsertNodes(nodes = []) {
@@ -990,140 +1168,216 @@ export class OpfsGraphStore {
}
async importSnapshot(snapshot, options = {}) {
const normalizedSnapshot = sanitizeSnapshot(snapshot);
const mode = normalizeMode(options.mode);
const shouldMarkSyncDirty = options.markSyncDirty !== false;
const nowMs = Date.now();
const currentSnapshot = await this._loadSnapshot();
const nextSnapshot =
mode === "replace"
? normalizedSnapshot
: {
meta: {
...currentSnapshot.meta,
...normalizedSnapshot.meta,
},
state: {
...currentSnapshot.state,
...normalizedSnapshot.state,
},
nodes: mergeSnapshotRecords(currentSnapshot.nodes, normalizedSnapshot.nodes),
edges: mergeSnapshotRecords(currentSnapshot.edges, normalizedSnapshot.edges),
tombstones: mergeSnapshotRecords(
currentSnapshot.tombstones,
normalizedSnapshot.tombstones,
),
};
const currentRevision = normalizeRevision(currentSnapshot.meta?.revision);
const incomingRevision = normalizeRevision(normalizedSnapshot.meta?.revision);
const explicitRevision = normalizeRevision(options.revision);
const requestedRevision = Number.isFinite(Number(options.revision))
? explicitRevision
: options.preserveRevision
? incomingRevision
: currentRevision + 1;
const nextRevision = Math.max(currentRevision + 1, requestedRevision);
nextSnapshot.meta = {
...nextSnapshot.meta,
chatId: this.chatId,
revision: nextRevision,
lastModified: nowMs,
lastMutationReason: "importSnapshot",
syncDirty: shouldMarkSyncDirty,
syncDirtyReason: "importSnapshot",
storagePrimary: OPFS_STORE_KIND,
storageMode: this.storeMode,
};
nextSnapshot.state = {
...nextSnapshot.state,
lastProcessedFloor: Number.isFinite(Number(nextSnapshot?.state?.lastProcessedFloor))
? Number(nextSnapshot.state.lastProcessedFloor)
: Number.isFinite(Number(nextSnapshot?.meta?.lastProcessedFloor))
? Number(nextSnapshot.meta.lastProcessedFloor)
: META_DEFAULT_LAST_PROCESSED_FLOOR,
extractionCount: Number.isFinite(Number(nextSnapshot?.state?.extractionCount))
? Number(nextSnapshot.state.extractionCount)
: Number.isFinite(Number(nextSnapshot?.meta?.extractionCount))
? Number(nextSnapshot.meta.extractionCount)
: META_DEFAULT_EXTRACTION_COUNT,
};
await this._writeResolvedSnapshot(nextSnapshot);
return await this._runSerializedWrite("importSnapshot", async () => {
const normalizedSnapshot = sanitizeSnapshot(snapshot);
const mode = normalizeMode(options.mode);
const shouldMarkSyncDirty = options.markSyncDirty !== false;
const nowMs = Date.now();
const currentSnapshot = await this._loadSnapshot({ awaitWrites: false });
const nextSnapshot =
mode === "replace"
? normalizedSnapshot
: {
meta: {
...currentSnapshot.meta,
...normalizedSnapshot.meta,
},
state: {
...currentSnapshot.state,
...normalizedSnapshot.state,
},
nodes: mergeSnapshotRecords(currentSnapshot.nodes, normalizedSnapshot.nodes),
edges: mergeSnapshotRecords(currentSnapshot.edges, normalizedSnapshot.edges),
tombstones: mergeSnapshotRecords(
currentSnapshot.tombstones,
normalizedSnapshot.tombstones,
),
};
const currentRevision = normalizeRevision(currentSnapshot.meta?.revision);
const incomingRevision = normalizeRevision(normalizedSnapshot.meta?.revision);
const explicitRevision = normalizeRevision(options.revision);
const requestedRevision = Number.isFinite(Number(options.revision))
? explicitRevision
: options.preserveRevision
? incomingRevision
: currentRevision + 1;
const nextRevision = Math.max(currentRevision + 1, requestedRevision);
nextSnapshot.meta = {
...nextSnapshot.meta,
chatId: this.chatId,
revision: nextRevision,
lastModified: nowMs,
lastMutationReason: "importSnapshot",
syncDirty: shouldMarkSyncDirty,
syncDirtyReason: "importSnapshot",
storagePrimary: OPFS_STORE_KIND,
storageMode: this.storeMode,
};
nextSnapshot.state = {
...nextSnapshot.state,
lastProcessedFloor: Number.isFinite(Number(nextSnapshot?.state?.lastProcessedFloor))
? Number(nextSnapshot.state.lastProcessedFloor)
: Number.isFinite(Number(nextSnapshot?.meta?.lastProcessedFloor))
? Number(nextSnapshot.meta.lastProcessedFloor)
: META_DEFAULT_LAST_PROCESSED_FLOOR,
extractionCount: Number.isFinite(Number(nextSnapshot?.state?.extractionCount))
? Number(nextSnapshot.state.extractionCount)
: Number.isFinite(Number(nextSnapshot?.meta?.extractionCount))
? Number(nextSnapshot.meta.extractionCount)
: META_DEFAULT_EXTRACTION_COUNT,
};
await this._writeResolvedSnapshot(nextSnapshot);
return {
mode,
revision: nextRevision,
imported: {
nodes: nextSnapshot.nodes.length,
edges: nextSnapshot.edges.length,
tombstones: nextSnapshot.tombstones.length,
},
};
return {
mode,
revision: nextRevision,
imported: {
nodes: nextSnapshot.nodes.length,
edges: nextSnapshot.edges.length,
tombstones: nextSnapshot.tombstones.length,
},
};
});
}
async clearAll() {
const currentRevision = await this.getRevision();
const nextRevision = currentRevision + 1;
await this._writeResolvedSnapshot({
meta: {
return await this._runSerializedWrite("clearAll", async () => {
const currentRevision = normalizeRevision(
(await this._readManifest({ awaitWrites: false }))?.meta?.revision,
);
const nextRevision = currentRevision + 1;
await this._writeResolvedSnapshot({
meta: {
revision: nextRevision,
lastModified: Date.now(),
lastMutationReason: "clearAll",
syncDirty: true,
syncDirtyReason: "clearAll",
storagePrimary: OPFS_STORE_KIND,
storageMode: this.storeMode,
},
state: {
lastProcessedFloor: META_DEFAULT_LAST_PROCESSED_FLOOR,
extractionCount: META_DEFAULT_EXTRACTION_COUNT,
},
nodes: [],
edges: [],
tombstones: [],
});
return {
cleared: true,
revision: nextRevision,
lastModified: Date.now(),
lastMutationReason: "clearAll",
syncDirty: true,
syncDirtyReason: "clearAll",
storagePrimary: OPFS_STORE_KIND,
storageMode: this.storeMode,
},
state: {
lastProcessedFloor: META_DEFAULT_LAST_PROCESSED_FLOOR,
extractionCount: META_DEFAULT_EXTRACTION_COUNT,
},
nodes: [],
edges: [],
tombstones: [],
};
});
return {
cleared: true,
revision: nextRevision,
};
}
async pruneExpiredTombstones(nowMs = Date.now()) {
const normalizedNow = normalizeTimestamp(nowMs, Date.now());
const cutoffMs = normalizedNow - BME_TOMBSTONE_RETENTION_MS;
const snapshot = await this._loadSnapshot();
const nextTombstones = snapshot.tombstones.filter(
(item) => normalizeTimestamp(item?.deletedAt, 0) >= cutoffMs,
return await this._runSerializedWrite(
"pruneExpiredTombstones",
async () => {
const normalizedNow = normalizeTimestamp(nowMs, Date.now());
const cutoffMs = normalizedNow - BME_TOMBSTONE_RETENTION_MS;
const snapshot = await this._loadSnapshot({ awaitWrites: false });
const nextTombstones = snapshot.tombstones.filter(
(item) => normalizeTimestamp(item?.deletedAt, 0) >= cutoffMs,
);
const removedCount = snapshot.tombstones.length - nextTombstones.length;
if (removedCount <= 0) {
return {
pruned: 0,
revision: normalizeRevision(snapshot.meta?.revision),
cutoffMs,
};
}
const nextRevision = normalizeRevision(snapshot.meta?.revision) + 1;
await this._writeResolvedSnapshot({
meta: {
...snapshot.meta,
revision: nextRevision,
lastModified: normalizedNow,
lastMutationReason: "pruneExpiredTombstones",
syncDirty: true,
syncDirtyReason: "pruneExpiredTombstones",
storagePrimary: OPFS_STORE_KIND,
storageMode: this.storeMode,
},
state: snapshot.state,
nodes: snapshot.nodes,
edges: snapshot.edges,
tombstones: nextTombstones,
});
return {
pruned: removedCount,
revision: nextRevision,
cutoffMs,
};
},
);
const removedCount = snapshot.tombstones.length - nextTombstones.length;
if (removedCount <= 0) {
return {
pruned: 0,
revision: normalizeRevision(snapshot.meta?.revision),
cutoffMs,
};
}
async _recoverManifestFromDirectory(chatDirectory, manifest = null) {
const fileNames = await listDirectoryFileNames(chatDirectory);
const coreCandidates = fileNames
.map((name) => parseSnapshotFilenameCandidate(name, OPFS_CORE_FILENAME_PREFIX))
.filter(Boolean);
const auxCandidates = fileNames
.map((name) => parseSnapshotFilenameCandidate(name, OPFS_AUX_FILENAME_PREFIX))
.filter(Boolean);
if (!coreCandidates.length || !auxCandidates.length) {
return null;
}
const nextRevision = normalizeRevision(snapshot.meta?.revision) + 1;
await this._writeResolvedSnapshot({
const coreByRevision = new Map();
const auxByRevision = new Map();
for (const candidate of coreCandidates) {
const current = coreByRevision.get(candidate.revision) || null;
if (!current || candidate.stampMs > current.stampMs) {
coreByRevision.set(candidate.revision, candidate);
}
}
for (const candidate of auxCandidates) {
const current = auxByRevision.get(candidate.revision) || null;
if (!current || candidate.stampMs > current.stampMs) {
auxByRevision.set(candidate.revision, candidate);
}
}
const candidateRevisions = Array.from(coreByRevision.keys())
.filter((revision) => auxByRevision.has(revision))
.sort((left, right) => right - left);
if (!candidateRevisions.length) {
return null;
}
const recoveredRevision = candidateRevisions[0];
const recoveredCore = coreByRevision.get(recoveredRevision);
const recoveredAux = auxByRevision.get(recoveredRevision);
if (!recoveredCore || !recoveredAux) {
return null;
}
const nextManifest = {
...(manifest || {}),
version: OPFS_MANIFEST_VERSION,
chatId: this.chatId,
storeKind: OPFS_STORE_KIND,
storeMode: this.storeMode,
activeCoreFilename: recoveredCore.filename,
activeAuxFilename: recoveredAux.filename,
meta: {
...snapshot.meta,
revision: nextRevision,
lastModified: normalizedNow,
lastMutationReason: "pruneExpiredTombstones",
syncDirty: true,
syncDirtyReason: "pruneExpiredTombstones",
...createDefaultMetaValues(this.chatId),
...(manifest?.meta && typeof manifest.meta === "object" && !Array.isArray(manifest.meta)
? toPlainData(manifest.meta, {})
: {}),
revision: recoveredRevision,
chatId: this.chatId,
storagePrimary: OPFS_STORE_KIND,
storageMode: this.storeMode,
},
state: snapshot.state,
nodes: snapshot.nodes,
edges: snapshot.edges,
tombstones: nextTombstones,
});
return {
pruned: removedCount,
revision: nextRevision,
cutoffMs,
};
await writeJsonFile(chatDirectory, OPFS_MANIFEST_FILENAME, nextManifest);
this._manifestCache = nextManifest;
return nextManifest;
}
async _getChatDirectory() {
@@ -1150,8 +1404,8 @@ export class OpfsGraphStore {
return await this._chatDirectoryPromise;
}
async _ensureManifest() {
const existingManifest = await this._readManifest();
async _ensureManifest(options = {}) {
const existingManifest = await this._readManifest(options);
if (existingManifest) {
return existingManifest;
}
@@ -1172,7 +1426,10 @@ export class OpfsGraphStore {
return manifest;
}
async _readManifest() {
async _readManifest({ awaitWrites = true } = {}) {
if (awaitWrites) {
await this._awaitPendingWrites();
}
if (this._manifestCache) {
return this._manifestCache;
}
@@ -1209,21 +1466,71 @@ export class OpfsGraphStore {
return manifest;
}
async _loadSnapshot() {
const manifest = await this._ensureManifest();
async _loadSnapshot({ awaitWrites = true } = {}) {
if (awaitWrites) {
await this._awaitPendingWrites();
}
let manifest = await this._ensureManifest({
awaitWrites: false,
});
const chatDirectory = await this._getChatDirectory();
const corePayload = manifest.activeCoreFilename
? await readJsonFile(chatDirectory, manifest.activeCoreFilename, {})
: {};
const auxPayload = manifest.activeAuxFilename
? await readJsonFile(chatDirectory, manifest.activeAuxFilename, {})
: {};
const activeCoreRevision = parseSnapshotFilenameCandidate(
manifest?.activeCoreFilename,
OPFS_CORE_FILENAME_PREFIX,
)?.revision;
const activeAuxRevision = parseSnapshotFilenameCandidate(
manifest?.activeAuxFilename,
OPFS_AUX_FILENAME_PREFIX,
)?.revision;
let shouldRecoverManifest =
Boolean(manifest?.activeCoreFilename) &&
Boolean(manifest?.activeAuxFilename) &&
Number.isFinite(activeCoreRevision) &&
Number.isFinite(activeAuxRevision) &&
activeCoreRevision !== activeAuxRevision;
let corePayload = {};
let auxPayload = {};
try {
corePayload = manifest.activeCoreFilename
? await readJsonFile(chatDirectory, manifest.activeCoreFilename, null)
: {};
auxPayload = manifest.activeAuxFilename
? await readJsonFile(chatDirectory, manifest.activeAuxFilename, null)
: {};
if (
(manifest.activeCoreFilename && !corePayload) ||
(manifest.activeAuxFilename && !auxPayload)
) {
shouldRecoverManifest = true;
}
} catch {
shouldRecoverManifest = true;
}
if (shouldRecoverManifest) {
const recoveredManifest = await this._recoverManifestFromDirectory(
chatDirectory,
manifest,
);
if (!recoveredManifest) {
throw new Error("opfs-manifest-snapshot-mismatch");
}
manifest = recoveredManifest;
corePayload = manifest.activeCoreFilename
? await readJsonFile(chatDirectory, manifest.activeCoreFilename, {})
: {};
auxPayload = manifest.activeAuxFilename
? await readJsonFile(chatDirectory, manifest.activeAuxFilename, {})
: {};
}
return buildSnapshotFromStoredParts(manifest, corePayload, auxPayload);
}
async _writeResolvedSnapshot(snapshot) {
const chatDirectory = await this._getChatDirectory();
const previousManifest = await this._ensureManifest();
const previousManifest = await this._ensureManifest({
awaitWrites: false,
});
const normalizedSnapshot = sanitizeSnapshot(snapshot);
const state = normalizeSnapshotState(normalizedSnapshot);
const writeStamp = Date.now();