fix: backend vector state marked dirty on sync/restore import and query failure

- sync/bme-sync.js: conservatively clear backend hash mappings and mark
  vectorIndexState dirty before importing remote snapshots via download,
  merge, and cloud backup restore, preventing stale clean-looking state
  after cross-device sync or restore
- vector/vector-index.js: mark backend vector state dirty on real backend
  query failures (HTTP/network) instead of silently returning empty results
- regression: indexeddb-sync.mjs covers download/restore/merge import
  dirty marking; p0-regressions.mjs covers backend query failure dirtying
This commit is contained in:
Youzini-afk
2026-04-12 19:42:36 +08:00
parent d350de809e
commit 913a102b39
4 changed files with 331 additions and 42 deletions

View File

@@ -793,6 +793,54 @@ function normalizeSyncSnapshot(snapshot = {}, chatId = "") {
};
}
function markBackendVectorSnapshotDirty(
snapshot = {},
reason = "backend-sync-import-unverified",
warning = "后端向量索引需要在当前环境重建",
) {
if (!snapshot || typeof snapshot !== "object" || Array.isArray(snapshot)) {
return snapshot;
}
if (!snapshot.meta || typeof snapshot.meta !== "object" || Array.isArray(snapshot.meta)) {
return snapshot;
}
const vectorMeta = normalizeRuntimeVectorMeta(
snapshot.meta?.[RUNTIME_VECTOR_META_KEY],
);
if (vectorMeta.mode !== "backend") {
return snapshot;
}
const total = Math.max(
normalizeNonNegativeInteger(vectorMeta.lastStats?.total, 0),
Object.keys(vectorMeta.nodeToHash || {}).length,
Object.keys(vectorMeta.hashToNodeId || {}).length,
);
const pending = total > 0
? Math.max(1, normalizeNonNegativeInteger(vectorMeta.lastStats?.pending, 0))
: normalizeNonNegativeInteger(vectorMeta.lastStats?.pending, 0);
snapshot.meta[RUNTIME_VECTOR_META_KEY] = {
...vectorMeta,
hashToNodeId: {},
nodeToHash: {},
replayRequiredNodeIds: [],
dirty: true,
dirtyReason: String(reason || "backend-sync-import-unverified"),
pendingRepairFromFloor: 0,
lastStats: {
total,
indexed: 0,
stale: total,
pending,
},
lastWarning: String(warning || "后端向量索引需要在当前环境重建"),
};
return snapshot;
}
function createRecordWinnerByUpdatedAt(localRecord, remoteRecord) {
if (!localRecord) return remoteRecord || null;
if (!remoteRecord) return localRecord || null;
@@ -2057,9 +2105,13 @@ export async function restoreFromServer(chatId, options = {}) {
};
}
const snapshot = markManualBackupHistoryForLocalRebind(
envelope.snapshot,
normalizedChatId,
const snapshot = markBackendVectorSnapshotDirty(
markManualBackupHistoryForLocalRebind(
envelope.snapshot,
normalizedChatId,
),
"backend-backup-restore-unverified",
"后端向量索引已从云备份恢复,需要在当前环境重建",
);
if (normalizeChatId(snapshot.meta?.chatId) !== normalizedChatId) {
return {
@@ -2285,7 +2337,11 @@ export async function download(chatId, options = {}) {
};
}
const remoteSnapshot = normalizeSyncSnapshot(remoteResult.snapshot, normalizedChatId);
const remoteSnapshot = markBackendVectorSnapshotDirty(
normalizeSyncSnapshot(remoteResult.snapshot, normalizedChatId),
"backend-sync-download-unverified",
"后端向量索引已从远端同步恢复,需要在当前环境重建",
);
const remoteRevision = normalizeRevision(remoteSnapshot.meta.revision);
await db.importSnapshot(remoteSnapshot, {
@@ -2615,9 +2671,13 @@ export async function syncNow(chatId, options = {}) {
};
}
const mergedSnapshot = mergeSnapshots(localSnapshot, remoteSnapshot, {
chatId: normalizedChatId,
});
const mergedSnapshot = markBackendVectorSnapshotDirty(
mergeSnapshots(localSnapshot, remoteSnapshot, {
chatId: normalizedChatId,
}),
"backend-sync-merge-unverified",
"后端向量索引已从远端合并恢复,需要在当前环境重建",
);
await db.importSnapshot(mergedSnapshot, {
mode: "replace",

View File

@@ -332,6 +332,23 @@ async function testDownloadImport() {
nodeCount: 1,
edgeCount: 0,
tombstoneCount: 0,
runtimeVectorIndexState: {
mode: "backend",
collectionId: "st-bme::chat-download",
source: "openai",
hashToNodeId: {
"hash-remote-node": "remote-node",
},
nodeToHash: {
"remote-node": "hash-remote-node",
},
lastStats: {
total: 1,
indexed: 1,
stale: 0,
pending: 0,
},
},
},
nodes: [{ id: "remote-node", updatedAt: 400 }],
edges: [],
@@ -348,6 +365,17 @@ async function testDownloadImport() {
assert.equal(result.downloaded, true);
assert.equal(db.lastImportPayload.meta.revision, 12);
assert.equal(db.lastImportPayload.nodes[0].id, "remote-node");
assert.equal(db.lastImportPayload.meta.runtimeVectorIndexState.dirty, true);
assert.equal(
db.lastImportPayload.meta.runtimeVectorIndexState.dirtyReason,
"backend-sync-download-unverified",
);
assert.deepEqual(db.lastImportPayload.meta.runtimeVectorIndexState.hashToNodeId, {});
assert.deepEqual(db.lastImportPayload.meta.runtimeVectorIndexState.nodeToHash, {});
assert.equal(
db.lastImportPayload.meta.runtimeVectorIndexState.pendingRepairFromFloor,
0,
);
}
async function testLegacyRemoteFilenameFallbackAndReuse() {
@@ -648,6 +676,23 @@ async function testManualBackupAndRestoreFlow() {
{ id: "journal-5", processedRange: [4, 4], createdAt: 55 },
{ id: "journal-6", processedRange: [5, 5], createdAt: 66 },
],
runtimeVectorIndexState: {
mode: "backend",
collectionId: "st-bme::chat-backup-flow",
source: "openai",
hashToNodeId: {
"hash-local-node": "local-node",
},
nodeToHash: {
"local-node": "hash-local-node",
},
lastStats: {
total: 1,
indexed: 1,
stale: 0,
pending: 0,
},
},
maintenanceJournal: [
{ id: "maintenance-a", updatedAt: 70 },
{ id: "maintenance-b", updatedAt: 80 },
@@ -768,6 +813,13 @@ async function testManualBackupAndRestoreFlow() {
assert.equal(db.snapshot.meta.runtimeHistoryState.lastMutationReason, "");
assert.equal(db.snapshot.meta.runtimeHistoryState.lastMutationSource, "");
assert.equal(db.snapshot.meta.runtimeHistoryState.lastRecoveryResult, null);
assert.equal(db.snapshot.meta.runtimeVectorIndexState.dirty, true);
assert.equal(
db.snapshot.meta.runtimeVectorIndexState.dirtyReason,
"backend-backup-restore-unverified",
);
assert.deepEqual(db.snapshot.meta.runtimeVectorIndexState.hashToNodeId, {});
assert.deepEqual(db.snapshot.meta.runtimeVectorIndexState.nodeToHash, {});
assert.ok(Number(db.meta.get("lastBackupRestoredAt")) > 0);
const safetyStatus = await getRestoreSafetySnapshotStatus(
"chat-backup-flow",
@@ -1247,6 +1299,23 @@ async function testSyncAppliedHook() {
nodeCount: 1,
edgeCount: 0,
tombstoneCount: 0,
runtimeVectorIndexState: {
mode: "backend",
collectionId: "st-bme::chat-hook-merge",
source: "openai",
hashToNodeId: {
"hash-local-merge": "local-merge",
},
nodeToHash: {
"local-merge": "hash-local-merge",
},
lastStats: {
total: 1,
indexed: 1,
stale: 0,
pending: 0,
},
},
},
nodes: [{ id: "local-merge", updatedAt: 20 }],
edges: [],
@@ -1263,7 +1332,33 @@ async function testSyncAppliedHook() {
state: { lastProcessedFloor: 2, extractionCount: 1 },
});
remoteFiles.set("ST-BME_sync_chat-hook-merge.json", {
meta: { schemaVersion: 1, chatId: "chat-hook-merge", revision: 4, lastModified: 25, deviceId: "remote", nodeCount: 1, edgeCount: 0, tombstoneCount: 0 },
meta: {
schemaVersion: 1,
chatId: "chat-hook-merge",
revision: 4,
lastModified: 25,
deviceId: "remote",
nodeCount: 1,
edgeCount: 0,
tombstoneCount: 0,
runtimeVectorIndexState: {
mode: "backend",
collectionId: "st-bme::chat-hook-merge",
source: "openai",
hashToNodeId: {
"hash-remote-merge": "remote-merge",
},
nodeToHash: {
"remote-merge": "hash-remote-merge",
},
lastStats: {
total: 1,
indexed: 1,
stale: 0,
pending: 0,
},
},
},
nodes: [{ id: "remote-merge", updatedAt: 25 }],
edges: [],
tombstones: [],
@@ -1284,6 +1379,22 @@ async function testSyncAppliedHook() {
assert.equal(downloadResult.revision, 3);
assert.equal(mergeResult.revision, 5);
assert.equal(
dbByChatId.get("chat-hook-merge").lastImportPayload.meta.runtimeVectorIndexState.dirty,
true,
);
assert.equal(
dbByChatId.get("chat-hook-merge").lastImportPayload.meta.runtimeVectorIndexState.dirtyReason,
"backend-sync-merge-unverified",
);
assert.deepEqual(
dbByChatId.get("chat-hook-merge").lastImportPayload.meta.runtimeVectorIndexState.hashToNodeId,
{},
);
assert.deepEqual(
dbByChatId.get("chat-hook-merge").lastImportPayload.meta.runtimeVectorIndexState.nodeToHash,
{},
);
assert.deepEqual(hookCalls.map((item) => item.action), ["download", "merge"]);
assert.deepEqual(hookCalls.map((item) => item.chatId), ["chat-hook-download", "chat-hook-merge"]);

View File

@@ -155,7 +155,10 @@ const {
removeNode,
} = await import("../graph/graph.js");
const { compressType } = await import("../maintenance/compressor.js");
const { syncGraphVectorIndex } = await import("../vector/vector-index.js");
const {
findSimilarNodesByText,
syncGraphVectorIndex,
} = await import("../vector/vector-index.js");
const {
extractMemories,
generateReflection,
@@ -1984,6 +1987,55 @@ async function testVectorIndexKeepsDirtyOnDirectPartialEmbeddingFailure() {
}
}
async function testBackendVectorQueryFailureMarksStateDirty() {
const originalFetch = globalThis.fetch;
const graph = normalizeGraphRuntimeState(createEmptyGraph(), "chat-backend-query");
const node = makeEvent(1, "后端向量节点");
addNode(graph, node);
graph.vectorIndexState.mode = "backend";
graph.vectorIndexState.source = "openai";
graph.vectorIndexState.collectionId = "st-bme::chat-backend-query";
graph.vectorIndexState.hashToNodeId = {
"hash-backend-node": node.id,
};
graph.vectorIndexState.nodeToHash = {
[node.id]: "hash-backend-node",
};
graph.vectorIndexState.lastStats = {
total: 1,
indexed: 1,
stale: 0,
pending: 0,
};
globalThis.fetch = async () => {
throw new Error("backend-down");
};
try {
await assert.rejects(
findSimilarNodesByText(
graph,
"测试后端向量失败",
{
mode: "backend",
source: "openai",
model: "text-embedding-3-small",
},
5,
[node],
),
/backend-down/,
);
assert.equal(graph.vectorIndexState.dirty, true);
assert.equal(graph.vectorIndexState.dirtyReason, "backend-query-failed");
assert.equal(graph.vectorIndexState.pendingRepairFromFloor, 0);
assert.match(graph.vectorIndexState.lastWarning, /后端向量查询失败/);
} finally {
globalThis.fetch = originalFetch;
}
}
async function testCompressTypeAcceptsTopLevelFieldsResult() {
const graph = createEmptyGraph();
const typeDef = {
@@ -6725,6 +6777,7 @@ async function testManualSleepExplainsThatItIsLocalOnlyWhenNothingChanges() {
await testCompressorMigratesEdgesToCompressedNode();
await testVectorIndexKeepsDirtyOnDirectPartialEmbeddingFailure();
await testBackendVectorQueryFailureMarksStateDirty();
await testCompressTypeAcceptsTopLevelFieldsResult();
await testExtractorFailsOnUnknownOperation();
await testExtractorNormalizesFlatCreateOperation();

View File

@@ -488,6 +488,50 @@ function resetVectorMappings(graph, config, chatId) {
graph.vectorIndexState.nodeToHash = {};
}
function markBackendVectorStateDirty(
graph,
config,
reason = "backend-query-failed",
warning = "后端向量查询失败,已标记待重建",
) {
if (!graph?.vectorIndexState || !isBackendVectorConfig(config)) {
return;
}
const state = graph.vectorIndexState;
const total = Math.max(
Number(state.lastStats?.total || 0),
Object.keys(state.nodeToHash || {}).length,
Object.keys(state.hashToNodeId || {}).length,
);
const previousIndexed = Number.isFinite(Number(state.lastStats?.indexed))
? Math.max(0, Math.floor(Number(state.lastStats.indexed)))
: 0;
const previousStale = Number.isFinite(Number(state.lastStats?.stale))
? Math.max(0, Math.floor(Number(state.lastStats.stale)))
: 0;
const previousPending = Number.isFinite(Number(state.lastStats?.pending))
? Math.max(0, Math.floor(Number(state.lastStats.pending)))
: 0;
state.mode = "backend";
state.source = config.source || state.source || "";
state.modelScope = getVectorModelScope(config) || state.modelScope || "";
state.collectionId = buildVectorCollectionId(graph?.historyState?.chatId);
state.dirty = true;
state.dirtyReason = String(reason || "backend-query-failed");
state.pendingRepairFromFloor = Number.isFinite(Number(state.pendingRepairFromFloor))
? Math.max(0, Math.floor(Number(state.pendingRepairFromFloor)))
: 0;
state.lastStats = {
total,
indexed: previousIndexed,
stale: Math.max(previousStale, total > 0 ? 1 : 0),
pending: total > 0 ? Math.max(1, previousPending) : previousPending,
};
state.lastWarning = String(warning || "后端向量查询失败,已标记待重建");
}
export async function syncGraphVectorIndex(
graph,
config,
@@ -723,41 +767,62 @@ export async function findSimilarNodesByText(
const validation = validateVectorConfig(config);
if (!validation.valid) return [];
const response = await fetchWithTimeout(
"/api/vector/query",
{
method: "POST",
headers: getRequestHeaders(),
signal,
body: JSON.stringify({
collectionId: graph.vectorIndexState.collectionId,
searchText: text,
topK,
threshold: 0,
...buildBackendSourceRequest(config),
}),
},
getConfiguredTimeoutMs(config),
);
try {
const response = await fetchWithTimeout(
"/api/vector/query",
{
method: "POST",
headers: getRequestHeaders(),
signal,
body: JSON.stringify({
collectionId: graph.vectorIndexState.collectionId,
searchText: text,
topK,
threshold: 0,
...buildBackendSourceRequest(config),
}),
},
getConfiguredTimeoutMs(config),
);
if (!response.ok) {
const errorText = await response.text().catch(() => response.statusText);
console.warn("[ST-BME] 后端向量查询失败:", errorText);
return [];
if (!response.ok) {
const errorText = await response.text().catch(() => response.statusText);
const message = errorText || response.statusText || `HTTP ${response.status}`;
console.warn("[ST-BME] 后端向量查询失败:", message);
markBackendVectorStateDirty(
graph,
config,
"backend-query-failed",
`后端向量查询失败(${message}),已标记待重建`,
);
return [];
}
const data = await response.json().catch(() => ({ hashes: [] }));
const hashes = Array.isArray(data?.hashes) ? data.hashes : [];
const nodeIdByHash = graph.vectorIndexState?.hashToNodeId || {};
const allowedIds = new Set(candidateNodes.map((node) => node.id));
return hashes
.map((hash, index) => ({
nodeId: nodeIdByHash[hash],
score: Math.max(0.01, 1 - index / Math.max(1, hashes.length)),
}))
.filter((entry) => entry.nodeId && allowedIds.has(entry.nodeId))
.slice(0, topK);
} catch (error) {
if (isAbortError(error)) {
throw error;
}
const message = error?.message || String(error) || "后端向量查询失败";
markBackendVectorStateDirty(
graph,
config,
"backend-query-failed",
`后端向量查询失败(${message}),已标记待重建`,
);
throw error;
}
const data = await response.json().catch(() => ({ hashes: [] }));
const hashes = Array.isArray(data?.hashes) ? data.hashes : [];
const nodeIdByHash = graph.vectorIndexState?.hashToNodeId || {};
const allowedIds = new Set(candidateNodes.map((node) => node.id));
return hashes
.map((hash, index) => ({
nodeId: nodeIdByHash[hash],
score: Math.max(0.01, 1 - index / Math.max(1, hashes.length)),
}))
.filter((entry) => entry.nodeId && allowedIds.has(entry.nodeId))
.slice(0, topK);
}
export async function testVectorConnection(config, chatId = "connection-test") {