fix(migration): 7 critical fixes for Authority migration safety and data integrity

- Fix #2: _executeStatements fallback now batches transactions (150/batch)
  and reorders upsert-before-delete to prevent data loss on payload overflow
- Fix #3: Read/write migratedToAuthority marker in chat_metadata to prevent
  re-overwriting from legacy sources after Authority migration
- Fix #1: Add OPFS → Authority migration channel (exportOpfsSnapshotForChat,
  maybeImportLegacyOpfsSnapshotToLocalStore) inserted before IndexedDB in
  migration chain
- Fix #4: Mark runtimeVectorIndexState dirty with triviumRebuildRequired
  and trigger submitAuthorityVectorRebuildJob after all three migration paths
- Fix #5: Dual-save safety snapshots to Authority blob; rollback can now
  recover from blob when local IndexedDB snapshot is unavailable
- Fix #6: Add isEmptyCheck detail and console.warn for near-empty stores
  to help diagnose residual vs real data in store-not-empty skips
- Fix #7: Add overflow warning logs and sessionStorage persistence for
  Authority offline queue max-items/max-bytes exceeded events
This commit is contained in:
Youzini-afk
2026-04-29 01:15:37 +08:00
parent 79fbd369ba
commit d702e267d3
5 changed files with 574 additions and 11 deletions

View File

@@ -144,6 +144,18 @@ export function enqueueAuthorityOfflineMutation(state = {}, mutation = {}, setti
const nextItems = [...current.offlineQueue, item];
const nextSummary = summarizeQueue(nextItems);
if (policy.maxItems > 0 && nextSummary.items > policy.maxItems) {
console.warn(
`[ST-BME] Authority 离线队列溢出 (maxItems=${policy.maxItems}),新突变被丢弃。` +
"恢复连接后请手动同步图谱。",
{ rejectedMutation: item },
);
try {
sessionStorage.setItem("st_bme:authority:overflow:global", JSON.stringify({
overflowAt: new Date(nowMs).toISOString(),
reason: "max-items-exceeded",
lostItemCount: 1,
}));
} catch {}
const nextState = createAuthorityBrowserState({
...current,
offlineQueueOverflow: true,
@@ -153,6 +165,18 @@ export function enqueueAuthorityOfflineMutation(state = {}, mutation = {}, setti
return { accepted: false, reason: "max-items-exceeded", state: nextState };
}
if (policy.maxBytes > 0 && nextSummary.bytes > policy.maxBytes) {
console.warn(
`[ST-BME] Authority 离线队列溢出 (maxBytes=${policy.maxBytes}),新突变被丢弃。` +
"恢复连接后请手动同步图谱。",
{ rejectedMutation: item },
);
try {
sessionStorage.setItem("st_bme:authority:overflow:global", JSON.stringify({
overflowAt: new Date(nowMs).toISOString(),
reason: "max-bytes-exceeded",
lostItemCount: 1,
}));
} catch {}
const nextState = createAuthorityBrowserState({
...current,
offlineQueueOverflow: true,
@@ -176,6 +200,9 @@ export function enqueueAuthorityOfflineMutation(state = {}, mutation = {}, setti
export function clearAuthorityOfflineQueue(state = {}, settings = {}, nowMs = Date.now()) {
const current = normalizeAuthorityBrowserState(state, settings, nowMs);
try {
sessionStorage.removeItem("st_bme:authority:overflow:global");
} catch {}
return createAuthorityBrowserState({
...current,
offlineQueue: [],

View File

@@ -742,6 +742,12 @@ export class AuthorityGraphStore {
edges: emptyStatus.edges,
tombstones: emptyStatus.tombstones,
},
isEmptyCheck: {
empty: false,
nodes: emptyStatus.nodes,
edges: emptyStatus.edges,
tombstones: emptyStatus.tombstones,
},
migrationCompletedAt: 0,
migrationSource,
legacyRetentionUntil,
@@ -1032,12 +1038,42 @@ export class AuthorityGraphStore {
async _executeStatements(statements = []) {
const normalizedStatements = toArray(statements).filter((statement) => statement?.sql);
if (!normalizedStatements.length) return null;
const BATCH_SIZE = 150;
if (typeof this.sqlClient?.transaction === "function") {
return await this.sqlClient.transaction(normalizedStatements);
if (normalizedStatements.length <= BATCH_SIZE) {
return await this.sqlClient.transaction(normalizedStatements);
}
let lastResult = null;
for (let i = 0; i < normalizedStatements.length; i += BATCH_SIZE) {
const batch = normalizedStatements.slice(i, i + BATCH_SIZE);
lastResult = await this.sqlClient.transaction(batch);
}
return lastResult;
}
const upsertStatements = [];
const deleteStatements = [];
for (const stmt of normalizedStatements) {
if (stmt.sql.trim().toUpperCase().startsWith("DELETE")) {
deleteStatements.push(stmt);
} else {
upsertStatements.push(stmt);
}
}
let result = null;
for (const statement of normalizedStatements) {
result = await this._execute(statement.sql, statement.params || {});
for (const stmt of upsertStatements) {
result = await this._execute(stmt.sql, stmt.params || {});
}
for (const stmt of deleteStatements) {
result = await this._execute(stmt.sql, stmt.params || {});
}
if (deleteStatements.length > 0 && upsertStatements.length > 0) {
console.warn("[ST-BME] _executeStatements fallback 路径执行:先 upsert 后 delete无事务保护", {
chatId: this.chatId,
upsertCount: upsertStatements.length,
deleteCount: deleteStatements.length,
});
}
return result;
}

View File

@@ -1320,6 +1320,12 @@ class LegacyOpfsGraphStore {
edges: emptyStatus.edges,
tombstones: emptyStatus.tombstones,
},
isEmptyCheck: {
empty: false,
nodes: emptyStatus.nodes,
edges: emptyStatus.edges,
tombstones: emptyStatus.tombstones,
},
migrationCompletedAt: 0,
migrationSource,
legacyRetentionUntil,
@@ -2707,6 +2713,12 @@ export class OpfsGraphStore {
edges: emptyStatus.edges,
tombstones: emptyStatus.tombstones,
},
isEmptyCheck: {
empty: false,
nodes: emptyStatus.nodes,
edges: emptyStatus.edges,
tombstones: emptyStatus.tombstones,
},
migrationCompletedAt: 0,
migrationSource,
legacyRetentionUntil,

View File

@@ -1156,6 +1156,50 @@ export async function rollbackFromRestoreSafetySnapshot(chatId, options = {}) {
try {
const status = await getRestoreSafetySnapshotStatus(normalizedChatId, options);
if (!status.exists) {
try {
const blobAdapter = getAuthorityBlobAdapter(options);
if (blobAdapter && typeof blobAdapter.readJson === "function") {
const blobSnapshot = await blobAdapter.readJson(
`ST-BME/migration-safety/${normalizedChatId}.json`,
{ namespace: "st-bme-safety" },
);
if (blobSnapshot && typeof blobSnapshot === "object" && !Array.isArray(blobSnapshot)) {
const snapshot = normalizeSyncSnapshot(blobSnapshot, normalizedChatId);
const hasNodes = Array.isArray(snapshot.nodes) && snapshot.nodes.length > 0;
const hasEdges = Array.isArray(snapshot.edges) && snapshot.edges.length > 0;
if (hasNodes || hasEdges) {
const db = await getDb(normalizedChatId, options);
await db.importSnapshot(snapshot, {
mode: "replace",
preserveRevision: true,
revision: normalizeRevision(snapshot.meta?.revision),
markSyncDirty: false,
});
await patchDbMeta(db, {
deviceId: getOrCreateDeviceId(),
syncDirty: true,
syncDirtyReason: "restore-safety-rollback-from-blob",
lastBackupRollbackAt: Date.now(),
});
await invokeSyncAppliedHook(options, {
chatId: normalizedChatId,
action: "restore-backup-from-blob",
revision: normalizeRevision(snapshot.meta?.revision),
});
return {
restored: true,
chatId: normalizedChatId,
revision: normalizeRevision(snapshot.meta?.revision),
createdAt: 0,
blobRestored: true,
reason: "restored-from-authority-blob",
};
}
}
}
} catch (blobError) {
console.warn("[ST-BME] 从 Authority blob 恢复安全快照失败:", blobError);
}
return {
restored: false,
chatId: normalizedChatId,