feat(authority): migrate local graphs to server primary

This commit is contained in:
Youzini-afk
2026-04-28 03:02:43 +08:00
parent dc37d22dcf
commit 3f70d63a86
5 changed files with 715 additions and 42 deletions

View File

@@ -111,6 +111,7 @@ import {
createAuthorityBrowserState,
getAuthorityBrowserStateSnapshot,
normalizeAuthorityBrowserState,
recordAuthorityAcceptedRevision,
} from "../sync/authority-browser-state.js";
import {
AUTHORITY_GRAPH_STORE_KIND,
@@ -323,6 +324,173 @@ async function createGraphPersistenceHarness({
indexedDbSnapshotMap.set(normalizedChatId, structuredClone(snapshot));
}
const authoritySnapshotMap = new Map();
function getAuthoritySnapshotForChat(targetChatId = "") {
const normalizedChatId = String(targetChatId || "");
if (normalizedChatId && authoritySnapshotMap.has(normalizedChatId)) {
return structuredClone(authoritySnapshotMap.get(normalizedChatId));
}
return {
meta: {
revision: 0,
chatId: normalizedChatId,
storagePrimary: AUTHORITY_GRAPH_STORE_KIND,
storageMode: AUTHORITY_GRAPH_STORE_MODE,
},
nodes: [],
edges: [],
tombstones: [],
state: { lastProcessedFloor: -1, extractionCount: 0 },
};
}
function setAuthoritySnapshotForChat(targetChatId = "", snapshot = null) {
const normalizedChatId = String(targetChatId || "");
if (!normalizedChatId) return;
if (!snapshot) {
authoritySnapshotMap.delete(normalizedChatId);
return;
}
authoritySnapshotMap.set(normalizedChatId, structuredClone(snapshot));
}
class HarnessAuthorityGraphStore {
constructor(dbChatId = "") {
this.chatId = String(dbChatId || "");
this.storeKind = AUTHORITY_GRAPH_STORE_KIND;
this.storeMode = AUTHORITY_GRAPH_STORE_MODE;
}
async open() {}
async close() {}
getStorageDiagnosticsSync() {
return {
formatVersion: 1,
migrationState: "idle",
resolvedStoreMode: AUTHORITY_GRAPH_STORE_MODE,
storageKind: AUTHORITY_GRAPH_STORE_KIND,
browserCacheMode: "minimal",
};
}
async exportSnapshot() {
return getAuthoritySnapshotForChat(this.chatId);
}
async exportSnapshotProbe() {
return getAuthoritySnapshotForChat(this.chatId);
}
async importSnapshot(snapshot = {}, options = {}) {
const current = getAuthoritySnapshotForChat(this.chatId);
const currentRevision = Number(current?.meta?.revision || 0);
const incomingRevision = Number(snapshot?.meta?.revision || 0);
const requestedRevision = Number.isFinite(Number(options?.revision))
? Math.max(0, Math.floor(Number(options.revision)))
: options?.preserveRevision === true
? Math.max(0, Math.floor(incomingRevision))
: currentRevision + 1;
const nextRevision = Math.max(currentRevision + 1, requestedRevision);
const nowMs = Date.now();
const nodes = Array.isArray(snapshot?.nodes)
? snapshot.nodes.map((node) => structuredClone(node))
: [];
const edges = Array.isArray(snapshot?.edges)
? snapshot.edges.map((edge) => structuredClone(edge))
: [];
const tombstones = Array.isArray(snapshot?.tombstones)
? snapshot.tombstones.map((record) => structuredClone(record))
: [];
const nextSnapshot = {
meta: {
...(snapshot?.meta && typeof snapshot.meta === "object"
? structuredClone(snapshot.meta)
: {}),
chatId: this.chatId,
storagePrimary: AUTHORITY_GRAPH_STORE_KIND,
storageMode: AUTHORITY_GRAPH_STORE_MODE,
revision: nextRevision,
lastModified: nowMs,
lastMutationReason: "importSnapshot",
syncDirty: options?.markSyncDirty !== false,
syncDirtyReason: options?.markSyncDirty === false ? "" : "importSnapshot",
nodeCount: nodes.length,
edgeCount: edges.length,
tombstoneCount: tombstones.length,
},
nodes,
edges,
tombstones,
state:
snapshot?.state && typeof snapshot.state === "object"
? structuredClone(snapshot.state)
: { lastProcessedFloor: -1, extractionCount: 0 },
};
setAuthoritySnapshotForChat(this.chatId, nextSnapshot);
return {
mode: String(options?.mode || "replace"),
revision: nextRevision,
imported: {
nodes: nodes.length,
edges: edges.length,
tombstones: tombstones.length,
},
};
}
async importLegacyGraph(graph, options = {}) {
const revision = Math.max(1, Math.floor(Number(options?.revision) || 1));
const snapshot = buildSnapshotFromGraph(graph, {
chatId: this.chatId,
revision,
meta: {
migrationCompletedAt: Number(options?.nowMs || Date.now()),
migrationSource: String(options?.source || "chat_metadata"),
},
});
const importResult = await this.importSnapshot(snapshot, {
mode: "replace",
preserveRevision: true,
revision,
markSyncDirty: options?.markSyncDirty,
});
return {
migrated: true,
revision: importResult.revision,
imported: importResult.imported,
};
}
async isEmpty() {
const snapshot = getAuthoritySnapshotForChat(this.chatId);
const nodes = Array.isArray(snapshot?.nodes) ? snapshot.nodes.length : 0;
const edges = Array.isArray(snapshot?.edges) ? snapshot.edges.length : 0;
const tombstones = Array.isArray(snapshot?.tombstones)
? snapshot.tombstones.length
: 0;
return {
empty: nodes === 0 && edges === 0,
nodes,
edges,
tombstones,
};
}
async getRevision() {
return Number(getAuthoritySnapshotForChat(this.chatId)?.meta?.revision || 0);
}
async getMeta(key, fallbackValue = 0) {
const snapshot = getAuthoritySnapshotForChat(this.chatId);
if (!snapshot?.meta || !(key in snapshot.meta)) {
return fallbackValue;
}
return snapshot.meta[key];
}
async patchMeta(record = {}) {
const snapshot = getAuthoritySnapshotForChat(this.chatId);
snapshot.meta = {
...(snapshot.meta || {}),
...(record && typeof record === "object" ? structuredClone(record) : {}),
};
setAuthoritySnapshotForChat(this.chatId, snapshot);
return record;
}
}
function commitIndexedDbDelta(targetChatId = "", delta = {}, options = {}) {
const normalizedChatId = String(targetChatId || "");
const currentSnapshot = getIndexedDbSnapshotForChat(normalizedChatId);
@@ -442,6 +610,9 @@ async function createGraphPersistenceHarness({
String(chatId || globalChatId || ""),
),
__indexedDbSnapshots: indexedDbSnapshotMap,
__authoritySnapshots: authoritySnapshotMap,
__getAuthoritySnapshotForChat: getAuthoritySnapshotForChat,
__setAuthoritySnapshotForChat: setAuthoritySnapshotForChat,
sessionStorage: storage,
localStorage,
extension_settings: {
@@ -457,9 +628,10 @@ async function createGraphPersistenceHarness({
createAuthorityBrowserState,
getAuthorityBrowserStateSnapshot,
normalizeAuthorityBrowserState,
recordAuthorityAcceptedRevision,
AUTHORITY_GRAPH_STORE_KIND,
AUTHORITY_GRAPH_STORE_MODE,
AuthorityGraphStore,
AuthorityGraphStore: HarnessAuthorityGraphStore,
migrateLegacyTaskProfiles(settings = {}) {
return {
taskProfilesVersion: Number(settings?.taskProfilesVersion || 0),
@@ -1071,6 +1243,32 @@ async function createGraphPersistenceHarness({
hasGraphPersistDirtyState,
pruneGraphPersistDirtyState,
buildBmeDbName,
buildRestoreSafetyChatId(chatId = "") {
return `__restore_safety__${String(chatId || "").trim()}`;
},
async createRestoreSafetySnapshot(chatId, snapshot, options = {}) {
const safetyDb =
typeof options?.getSafetyDb === "function"
? await options.getSafetyDb(chatId)
: new runtimeContext.BmeDatabase(
runtimeContext.buildRestoreSafetyChatId(chatId),
);
await safetyDb.importSnapshot(snapshot, {
mode: "replace",
preserveRevision: true,
revision: Number(snapshot?.meta?.revision || 0),
markSyncDirty: false,
});
if (typeof safetyDb.patchMeta === "function") {
await safetyDb.patchMeta({
restoreSafetySnapshotExists: true,
restoreSafetySnapshotChatId: String(chatId || "").trim(),
});
}
if (typeof safetyDb.close === "function") {
await safetyDb.close();
}
},
BME_GRAPH_LOCAL_STORAGE_MODE_AUTO: "auto",
BME_GRAPH_LOCAL_STORAGE_MODE_INDEXEDDB: "indexeddb",
BME_GRAPH_LOCAL_STORAGE_MODE_OPFS_PRIMARY: "opfs-primary",
@@ -1154,10 +1352,78 @@ async function createGraphPersistenceHarness({
revision: Number(snapshot?.meta?.revision) || 0,
};
}
async patchMeta(record = {}) {
const snapshot = getIndexedDbSnapshotForChat(this.chatId);
snapshot.meta = {
...(snapshot.meta || {}),
...(record && typeof record === "object" ? structuredClone(record) : {}),
};
setIndexedDbSnapshotForChat(this.chatId, snapshot);
return record;
}
async getMeta(key, fallbackValue = 0) {
const snapshot = getIndexedDbSnapshotForChat(this.chatId) || {};
if (!snapshot?.meta || !(key in snapshot.meta)) {
return fallbackValue;
}
return snapshot.meta[key];
}
async getRevision() {
const snapshot = getIndexedDbSnapshotForChat(this.chatId) || {};
return Number(snapshot?.meta?.revision) || 0;
}
async isEmpty() {
const snapshot = getIndexedDbSnapshotForChat(this.chatId) || {};
const nodes = Array.isArray(snapshot?.nodes) ? snapshot.nodes.length : 0;
const edges = Array.isArray(snapshot?.edges) ? snapshot.edges.length : 0;
const tombstones = Array.isArray(snapshot?.tombstones)
? snapshot.tombstones.length
: 0;
return {
empty: nodes === 0 && edges === 0,
nodes,
edges,
tombstones,
};
}
async importLegacyGraph(graph, options = {}) {
const revision = Number(options?.revision) || 1;
const migratedSnapshot = buildSnapshotFromGraph(graph, {
chatId: this.chatId || runtimeContext.__chatContext?.chatId || "",
revision,
meta: {
migrationCompletedAt: Date.now(),
migrationSource: "chat_metadata",
},
});
setIndexedDbSnapshotForChat(this.chatId, migratedSnapshot);
return {
migrated: true,
revision,
imported: {
nodes: migratedSnapshot?.nodes?.length || 0,
edges: migratedSnapshot?.edges?.length || 0,
tombstones: migratedSnapshot?.tombstones?.length || 0,
},
};
}
async markSyncDirty() {
if (runtimeContext.__markSyncDirtyShouldThrow) {
throw new Error("mark-sync-dirty-failed");
}
}
},
BmeChatManager: class {
constructor() {
constructor(options = {}) {
this._currentChatId = "";
this._databaseFactory =
typeof options?.databaseFactory === "function"
? options.databaseFactory
: null;
this._selectorKeyResolver =
typeof options?.selectorKeyResolver === "function"
? options.selectorKeyResolver
: null;
}
_createDb(dbChatId = "") {
return {
@@ -1249,6 +1515,16 @@ async function createGraphPersistenceHarness({
if (runtimeContext.__indexedDbGetCurrentDbShouldThrow) {
throw new Error("indexeddb-get-current-db-failed");
}
const selectorKey = this._selectorKeyResolver
? String(await this._selectorKeyResolver(this._currentChatId) || "")
: "";
if (this._databaseFactory && selectorKey.startsWith("authority:")) {
const db = await this._databaseFactory(this._currentChatId);
if (typeof db?.open === "function") {
await db.open();
}
return db;
}
return this._createDb(this._currentChatId);
}
async switchChat(dbChatId = "") {
@@ -1256,6 +1532,16 @@ async function createGraphPersistenceHarness({
runtimeContext.__indexedDbSnapshot = getIndexedDbSnapshotForChat(
this._currentChatId,
);
const selectorKey = this._selectorKeyResolver
? String(await this._selectorKeyResolver(this._currentChatId) || "")
: "";
if (this._databaseFactory && selectorKey.startsWith("authority:")) {
const db = await this._databaseFactory(this._currentChatId);
if (typeof db?.open === "function") {
await db.open();
}
return db;
}
return this._createDb(this._currentChatId);
}
async closeCurrent() {}
@@ -1327,6 +1613,20 @@ result = {
};
return bmeLocalStoreCapabilitySnapshot;
},
setAuthorityCapabilityState(patch = {}) {
authorityCapabilityState = normalizeAuthorityCapabilityState(
{
...authorityCapabilityState,
...(patch || {}),
},
getSettings(),
);
authorityBrowserState = normalizeAuthorityBrowserState(
authorityBrowserState,
getSettings(),
);
return authorityCapabilityState;
},
setChatContext(nextContext) {
globalThis.__chatContext = nextContext;
return globalThis.__chatContext;
@@ -1362,6 +1662,9 @@ result = {
const snapshot = globalThis.__indexedDbSnapshots.get(normalizedChatId);
return snapshot ? structuredClone(snapshot) : null;
},
getAuthoritySnapshotForChat(chatId) {
return globalThis.__getAuthoritySnapshotForChat(chatId);
},
};
`,
].join("\n"),
@@ -3642,6 +3945,92 @@ result = {
);
}
{
const chatId = "meta-authority-indexeddb-migration";
const legacyGraph = stampPersistedGraph(
createMeaningfulGraph(chatId, "authority-indexeddb-migration"),
{
revision: 4,
integrity: "meta-authority-indexeddb-migration",
chatId,
reason: "legacy-indexeddb",
},
);
const legacySnapshot = buildSnapshotFromGraph(legacyGraph, {
chatId,
revision: 4,
meta: {
integrity: "meta-authority-indexeddb-migration",
storagePrimary: "indexeddb",
storageMode: "indexeddb",
syncDirty: true,
},
});
const harness = await createGraphPersistenceHarness({
chatId,
globalChatId: chatId,
chatMetadata: {
integrity: "meta-authority-indexeddb-migration",
},
indexedDbSnapshot: legacySnapshot,
});
harness.runtimeContext.extension_settings[MODULE_NAME] = {
authorityEnabled: "on",
authorityPrimaryWhenAvailable: true,
authorityStorageMode: "server-primary",
authoritySqlPrimary: true,
authorityBrowserCacheMode: "minimal",
};
harness.api.setAuthorityCapabilityState({
installed: true,
healthy: true,
sessionReady: true,
permissionReady: true,
features: ["sql.query", "sql.mutation", "trivium.search", "jobs", "blob"],
reason: "ok",
lastProbeAt: Date.now(),
});
const result = await harness.api.loadGraphFromIndexedDb(chatId, {
source: "authority-indexeddb-migration",
attemptIndex: 0,
});
assert.equal(result.loaded, true);
assert.equal(result.reason, "authority-sql:authority-indexeddb-migration");
assert.equal(harness.runtimeContext.__syncNowCalls.length, 0);
const authoritySnapshot = harness.api.getAuthoritySnapshotForChat(chatId);
assert.equal(authoritySnapshot?.nodes?.length, 1);
assert.equal(authoritySnapshot?.meta?.storagePrimary, AUTHORITY_GRAPH_STORE_KIND);
assert.equal(authoritySnapshot?.meta?.storageMode, AUTHORITY_GRAPH_STORE_MODE);
assert.equal(
authoritySnapshot?.meta?.migrationSource,
"legacy_indexeddb_to_authority",
);
assert.equal(authoritySnapshot?.meta?.syncDirty, false);
const safetySnapshot = harness.api.getIndexedDbSnapshotForChat(
harness.runtimeContext.buildRestoreSafetyChatId(chatId),
);
assert.equal(safetySnapshot?.nodes?.length, 1);
assert.equal(safetySnapshot?.meta?.restoreSafetySnapshotExists, true);
assert.equal(safetySnapshot?.meta?.restoreSafetySnapshotChatId, chatId);
const live = harness.api.getGraphPersistenceLiveState();
assert.equal(live.storagePrimary, AUTHORITY_GRAPH_STORE_KIND);
assert.equal(live.storageMode, AUTHORITY_GRAPH_STORE_MODE);
assert.equal(live.authorityMigrationState, "completed");
assert.equal(live.authorityMigrationSource, "legacy_indexeddb_to_authority");
assert.equal(Number(live.authorityMigrationRevision), 4);
assert.equal(
live.lastAuthorityMigrationResult?.safetySnapshotResult?.restoreSafetyCaptured,
true,
);
assert.equal(
harness.runtimeContext.__indexedDbSnapshots.has(chatId),
true,
"迁移成功后仍保留 legacy IndexedDB 源数据,不删除本地数据",
);
}
{
const harness = await createGraphPersistenceHarness({
chatId: "chat-b",