Fix multi-device sync after persistence refactor

This commit is contained in:
Youzini-afk
2026-04-10 13:53:21 +08:00
parent a6b3137511
commit bbb44e7022
5 changed files with 356 additions and 52 deletions

184
index.js
View File

@@ -190,6 +190,7 @@ import {
} from "./runtime/settings-defaults.js";
import { retrieve } from "./retrieval/retriever.js";
import {
applyProcessedHistorySnapshotToGraph,
appendBatchJournal,
appendMaintenanceJournal,
buildRecoveryResult,
@@ -5896,16 +5897,22 @@ function buildGraphPersistResult({
};
}
function maybeCaptureGraphShadowSnapshot(reason = "runtime-shadow") {
const chatId = graphPersistenceState.chatId || getCurrentChatId();
if (!chatId || !currentGraph) return false;
function maybeCaptureGraphShadowSnapshot(
reason = "runtime-shadow",
{
graph = currentGraph,
chatId = graphPersistenceState.chatId || getCurrentChatId(),
revision = graphPersistenceState.revision,
} = {},
) {
if (!chatId || !graph) return false;
const hasMeaningfulGraphData =
!isGraphEffectivelyEmpty(currentGraph) ||
!isGraphEffectivelyEmpty(graph) ||
graphPersistenceState.shadowSnapshotUsed ||
graphPersistenceState.lastPersistedRevision > 0;
if (!hasMeaningfulGraphData) return false;
return writeGraphShadowSnapshot(chatId, currentGraph, {
revision: graphPersistenceState.revision,
return writeGraphShadowSnapshot(chatId, graph, {
revision,
reason,
});
}
@@ -6023,10 +6030,57 @@ function resolvePendingPersistLastProcessedAssistantFloor() {
return null;
}
function resolvePendingPersistGraphSource(chatId = "") {
const normalizedChatId = normalizeChatIdCandidate(
chatId || graphPersistenceState.queuedPersistChatId || graphPersistenceState.chatId,
);
const targetRevision = Math.max(
Number(graphPersistenceState.queuedPersistRevision || 0),
Number(graphPersistenceState.revision || 0),
);
const shadowSnapshot = normalizedChatId
? readGraphShadowSnapshot(normalizedChatId)
: null;
if (
shadowSnapshot &&
Number(shadowSnapshot.revision || 0) >= targetRevision &&
typeof shadowSnapshot.serializedGraph === "string" &&
shadowSnapshot.serializedGraph
) {
try {
const shadowGraph = cloneGraphForPersistence(
normalizeGraphRuntimeState(
deserializeGraph(shadowSnapshot.serializedGraph),
normalizedChatId,
),
normalizedChatId,
);
return {
graph: shadowGraph,
source: "shadow",
revision: Number(shadowSnapshot.revision || 0),
};
} catch (error) {
console.warn("[ST-BME] pending persist shadow graph 恢复失败:", error);
}
}
return {
graph: currentGraph,
source: "runtime",
revision: Math.max(
Number(getGraphPersistedRevision(currentGraph) || 0),
targetRevision,
),
};
}
function applyAcceptedPendingPersistState(
persistResult,
{
lastProcessedAssistantFloor = resolvePendingPersistLastProcessedAssistantFloor(),
persistedGraph = null,
} = {},
) {
ensureCurrentGraphRuntimeState();
@@ -6042,6 +6096,36 @@ function applyAcceptedPendingPersistState(
currentGraph.historyState.lastBatchStatus = batchStatus;
}
if (
persistedGraph &&
typeof persistedGraph === "object" &&
!Array.isArray(persistedGraph)
) {
const persistedHistory =
persistedGraph.historyState &&
typeof persistedGraph.historyState === "object" &&
!Array.isArray(persistedGraph.historyState)
? persistedGraph.historyState
: null;
if (persistedHistory) {
currentGraph.historyState.processedMessageHashVersion =
persistedHistory.processedMessageHashVersion ??
currentGraph.historyState.processedMessageHashVersion;
currentGraph.historyState.processedMessageHashes = cloneRuntimeDebugValue(
persistedHistory.processedMessageHashes || {},
currentGraph.historyState.processedMessageHashes || {},
);
currentGraph.historyState.processedMessageHashesNeedRefresh =
persistedHistory.processedMessageHashesNeedRefresh === true;
}
if (Array.isArray(persistedGraph.batchJournal)) {
currentGraph.batchJournal = cloneRuntimeDebugValue(
persistedGraph.batchJournal,
currentGraph.batchJournal || [],
);
}
}
if (
persistenceRecord.accepted === true &&
Number.isFinite(Number(lastProcessedAssistantFloor)) &&
@@ -6133,9 +6217,10 @@ function persistGraphToChatMetadata(
reason = "graph-persist",
revision = graphPersistenceState.revision,
immediate = false,
graph = currentGraph,
} = {},
) {
if (!context || !currentGraph) {
if (!context || !graph) {
return buildGraphPersistResult({
saved: false,
blocked: true,
@@ -6155,19 +6240,21 @@ function persistGraphToChatMetadata(
}
const nextIntegrity = getChatMetadataIntegrity(context);
const persistedGraph = cloneGraphForPersistence(currentGraph, chatId);
const persistedGraph = cloneGraphForPersistence(graph, chatId);
stampGraphPersistenceMeta(persistedGraph, {
revision,
reason,
chatId,
integrity: nextIntegrity,
});
stampGraphPersistenceMeta(currentGraph, {
revision,
reason,
chatId,
integrity: nextIntegrity,
});
if (graph === currentGraph) {
stampGraphPersistenceMeta(currentGraph, {
revision,
reason,
chatId,
integrity: nextIntegrity,
});
}
writeChatMetadataPatch(context, {
[GRAPH_METADATA_KEY]: persistedGraph,
});
@@ -6219,10 +6306,15 @@ function persistGraphToChatMetadata(
function queueGraphPersist(
reason = "graph-persist-blocked",
revision = graphPersistenceState.revision,
{ immediate = true } = {},
{ immediate = true, graph = currentGraph, chatId = undefined } = {},
) {
const queuedChatId = graphPersistenceState.chatId || getCurrentChatId();
const shadowCaptured = maybeCaptureGraphShadowSnapshot(reason);
const queuedChatId =
String(chatId || graphPersistenceState.chatId || getCurrentChatId()) || "";
const shadowCaptured = maybeCaptureGraphShadowSnapshot(reason, {
graph,
chatId: queuedChatId,
revision,
});
updateGraphPersistenceState({
queuedPersistRevision: Math.max(
graphPersistenceState.queuedPersistRevision || 0,
@@ -6380,16 +6472,20 @@ async function retryPendingGraphPersist({
});
}
const pendingPersistGraphSource = resolvePendingPersistGraphSource(
queuedChatId,
);
const pendingPersistGraph = pendingPersistGraphSource?.graph || currentGraph;
const targetRevision = Math.max(
Number(graphPersistenceState.queuedPersistRevision || 0),
Number(graphPersistenceState.revision || 0),
Number(graphPersistenceState.lastPersistedRevision || 0),
Number(getGraphPersistedRevision(currentGraph) || 0),
Number(pendingPersistGraphSource?.revision || 0),
Number(getGraphPersistedRevision(pendingPersistGraph) || 0),
);
const lastProcessedAssistantFloor =
resolvePendingPersistLastProcessedAssistantFloor();
const indexedDbResult = await saveGraphToIndexedDb(activeChatId, currentGraph, {
const indexedDbResult = await saveGraphToIndexedDb(activeChatId, pendingPersistGraph, {
revision: targetRevision,
reason,
});
@@ -6429,17 +6525,19 @@ async function retryPendingGraphPersist({
});
applyAcceptedPendingPersistState(persistResult, {
lastProcessedAssistantFloor,
persistedGraph: pendingPersistGraph,
});
void maybeResumePendingAutoExtraction("pending-persist-resolved:indexeddb");
return persistResult;
}
if (canPersistGraphToMetadataFallback(context, currentGraph)) {
if (canPersistGraphToMetadataFallback(context, pendingPersistGraph)) {
const metadataReason = `${reason}:metadata-full-fallback`;
const metadataResult = persistGraphToChatMetadata(context, {
reason: metadataReason,
revision: targetRevision,
immediate: true,
graph: pendingPersistGraph,
});
if (metadataResult?.saved) {
clearPendingGraphPersistRetry();
@@ -6477,6 +6575,7 @@ async function retryPendingGraphPersist({
});
applyAcceptedPendingPersistState(persistResult, {
lastProcessedAssistantFloor,
persistedGraph: pendingPersistGraph,
});
void maybeResumePendingAutoExtraction("pending-persist-resolved:metadata");
return persistResult;
@@ -6501,10 +6600,15 @@ async function retryPendingGraphPersist({
async function persistExtractionBatchResult({
reason = "extraction-batch-complete",
lastProcessedAssistantFloor = null,
graphSnapshot = null,
} = {}) {
ensureCurrentGraphRuntimeState();
const context = getContext();
if (!context || !currentGraph) {
const persistGraph =
graphSnapshot && typeof graphSnapshot === "object"
? cloneGraphSnapshot(graphSnapshot)
: currentGraph;
if (!context || !persistGraph) {
return buildGraphPersistResult({
saved: false,
blocked: true,
@@ -6526,7 +6630,7 @@ async function persistExtractionBatchResult({
}
const revision = bumpGraphRevision(reason);
const indexedDbResult = await saveGraphToIndexedDb(chatId, currentGraph, {
const indexedDbResult = await saveGraphToIndexedDb(chatId, persistGraph, {
revision,
reason,
});
@@ -6567,7 +6671,11 @@ async function persistExtractionBatchResult({
}
const shadowReason = `${reason}:shadow-fallback`;
const shadowCaptured = maybeCaptureGraphShadowSnapshot(shadowReason);
const shadowCaptured = maybeCaptureGraphShadowSnapshot(shadowReason, {
graph: persistGraph,
chatId,
revision,
});
if (shadowCaptured) {
if (isGraphMetadataWriteAllowed()) {
persistGraphCommitMarker(context, {
@@ -6596,6 +6704,10 @@ async function persistExtractionBatchResult({
queuedPersistReason: "",
});
clearPendingGraphPersistRetry();
queueGraphPersistToIndexedDb(chatId, persistGraph, {
revision,
reason: `${shadowReason}:promote-indexeddb`,
});
return buildGraphPersistResult({
saved: false,
accepted: true,
@@ -6606,12 +6718,13 @@ async function persistExtractionBatchResult({
});
}
if (canPersistGraphToMetadataFallback(context, currentGraph)) {
if (canPersistGraphToMetadataFallback(context, persistGraph)) {
const metadataReason = `${reason}:metadata-full-fallback`;
const metadataResult = persistGraphToChatMetadata(context, {
reason: metadataReason,
revision,
immediate: true,
graph: persistGraph,
});
if (metadataResult?.saved) {
persistGraphCommitMarker(context, {
@@ -6637,6 +6750,10 @@ async function persistExtractionBatchResult({
queuedPersistReason: "",
});
clearPendingGraphPersistRetry();
queueGraphPersistToIndexedDb(chatId, persistGraph, {
revision,
reason: `${metadataReason}:promote-indexeddb`,
});
return buildGraphPersistResult({
saved: true,
accepted: true,
@@ -6650,8 +6767,9 @@ async function persistExtractionBatchResult({
const queuedResult = queueGraphPersist(`${reason}:pending`, revision, {
immediate: true,
graph: persistGraph,
chatId,
});
schedulePendingGraphPersistRetry(`${reason}:pending`, 0);
updateGraphPersistenceState({
pendingPersist: true,
lastPersistReason: String(queuedResult.reason || `${reason}:pending`),
@@ -7331,14 +7449,11 @@ function markVectorStateDirty(reason = "向量状态已标记为待重建") {
function updateProcessedHistorySnapshot(chat, lastProcessedAssistantFloor) {
ensureCurrentGraphRuntimeState();
currentGraph.historyState.lastProcessedAssistantFloor =
lastProcessedAssistantFloor;
currentGraph.historyState.processedMessageHashVersion =
PROCESSED_MESSAGE_HASH_VERSION;
currentGraph.historyState.processedMessageHashes =
snapshotProcessedMessageHashes(chat, lastProcessedAssistantFloor);
currentGraph.historyState.processedMessageHashesNeedRefresh = false;
currentGraph.lastProcessedSeq = lastProcessedAssistantFloor;
applyProcessedHistorySnapshotToGraph(
currentGraph,
chat,
lastProcessedAssistantFloor,
);
}
function shouldAdvanceProcessedHistory(batchStatus) {
@@ -10324,6 +10439,7 @@ async function executeExtractionBatch({
return await executeExtractionBatchController(
{
appendBatchJournal,
applyProcessedHistorySnapshotToGraph,
buildExtractionMessages,
cloneGraphSnapshot,
computePostProcessArtifacts,

View File

@@ -84,6 +84,98 @@ function normalizePersistenceStateRecord(persistResult = null) {
};
}
function cloneSerializable(value, fallback = null) {
try {
return JSON.parse(JSON.stringify(value));
} catch {
return fallback;
}
}
function buildCommittedBatchPersistSnapshot(
runtime,
{
graph = null,
chat = [],
beforeSnapshot = null,
processedRange = [null, null],
postProcessArtifacts = [],
vectorHashesInserted = [],
extractionCountBefore = 0,
} = {},
) {
if (!graph || typeof runtime?.cloneGraphSnapshot !== "function") {
return {
persistGraphSnapshot: null,
committedBatchJournalEntry: null,
afterSnapshot: null,
committedAfterSnapshot: null,
postProcessArtifacts: Array.isArray(postProcessArtifacts)
? [...postProcessArtifacts]
: [],
};
}
const range = Array.isArray(processedRange) ? processedRange : [null, null];
const rangeStart = Number.isFinite(Number(range[0])) ? Number(range[0]) : null;
const rangeEnd = Number.isFinite(Number(range[1])) ? Number(range[1]) : null;
const afterSnapshot = runtime.cloneGraphSnapshot(graph);
const effectiveArtifacts = Array.isArray(postProcessArtifacts)
? [...postProcessArtifacts]
: [];
const committedGraphSnapshot = runtime.cloneGraphSnapshot(graph);
if (typeof runtime.applyProcessedHistorySnapshotToGraph === "function") {
runtime.applyProcessedHistorySnapshotToGraph(
committedGraphSnapshot,
chat,
rangeEnd,
);
} else {
if (
!committedGraphSnapshot.historyState ||
typeof committedGraphSnapshot.historyState !== "object" ||
Array.isArray(committedGraphSnapshot.historyState)
) {
committedGraphSnapshot.historyState = {};
}
committedGraphSnapshot.historyState.lastProcessedAssistantFloor =
Number.isFinite(rangeEnd) ? Math.floor(rangeEnd) : -1;
committedGraphSnapshot.lastProcessedSeq =
Number.isFinite(rangeEnd) ? Math.floor(rangeEnd) : -1;
}
const committedBatchJournalEntry =
typeof runtime.createBatchJournalEntry === "function"
? runtime.createBatchJournalEntry(beforeSnapshot, afterSnapshot, {
processedRange: [rangeStart, rangeEnd],
postProcessArtifacts: effectiveArtifacts,
vectorHashesInserted: Array.isArray(vectorHashesInserted)
? vectorHashesInserted
: [],
extractionCountBefore,
})
: null;
if (
committedBatchJournalEntry &&
typeof runtime.appendBatchJournal === "function"
) {
runtime.appendBatchJournal(
committedGraphSnapshot,
cloneSerializable(committedBatchJournalEntry, committedBatchJournalEntry),
);
}
return {
persistGraphSnapshot: committedGraphSnapshot,
committedBatchJournalEntry,
afterSnapshot,
committedAfterSnapshot: runtime.cloneGraphSnapshot(committedGraphSnapshot),
postProcessArtifacts: effectiveArtifacts,
};
}
function getPendingPersistenceGateInfo(runtime) {
const graph = runtime?.getCurrentGraph?.();
const batchStatus = graph?.historyState?.lastBatchStatus || null;
@@ -335,9 +427,23 @@ export async function executeExtractionBatchController(
batchStatus,
);
const batchStatusRef = effects?.batchStatus || batchStatus;
const committedPersistState = buildCommittedBatchPersistSnapshot(runtime, {
graph: runtime.getCurrentGraph(),
chat,
beforeSnapshot,
processedRange: [startIdx, endIdx],
postProcessArtifacts: runtime.computePostProcessArtifacts(
beforeSnapshot,
runtime.cloneGraphSnapshot(runtime.getCurrentGraph()),
effects?.postProcessArtifacts || [],
),
vectorHashesInserted: effects?.vectorHashesInserted || [],
extractionCountBefore,
});
const persistResult = await runtime.persistExtractionBatchResult({
reason: "extraction-batch-complete",
lastProcessedAssistantFloor: endIdx,
graphSnapshot: committedPersistState.persistGraphSnapshot,
});
const persistence = normalizePersistenceStateRecord(persistResult);
batchStatusRef.persistence = persistence;
@@ -359,6 +465,15 @@ export async function executeExtractionBatchController(
if (runtime.getCurrentGraph().historyState.lastBatchStatus.historyAdvanced) {
runtime.updateProcessedHistorySnapshot(chat, endIdx);
if (committedPersistState.committedBatchJournalEntry) {
runtime.appendBatchJournal(
runtime.getCurrentGraph(),
cloneSerializable(
committedPersistState.committedBatchJournalEntry,
committedPersistState.committedBatchJournalEntry,
),
);
}
} else if (!persistence.accepted) {
runtime.setLastExtractionStatus(
"提取待恢复",
@@ -373,22 +488,6 @@ export async function executeExtractionBatchController(
});
}
const afterSnapshot = runtime.cloneGraphSnapshot(runtime.getCurrentGraph());
const postProcessArtifacts = runtime.computePostProcessArtifacts(
beforeSnapshot,
afterSnapshot,
effects?.postProcessArtifacts || [],
);
runtime.appendBatchJournal(
runtime.getCurrentGraph(),
runtime.createBatchJournalEntry(beforeSnapshot, afterSnapshot, {
processedRange: [startIdx, endIdx],
postProcessArtifacts,
vectorHashesInserted: effects?.vectorHashesInserted || [],
extractionCountBefore,
}),
);
return {
success: finalizedBatchStatus.completed,
result,

View File

@@ -388,6 +388,38 @@ export function snapshotProcessedMessageHashes(
return result;
}
export function applyProcessedHistorySnapshotToGraph(
graph,
chat,
lastProcessedAssistantFloor,
) {
if (!graph || typeof graph !== "object") {
return graph;
}
const historyState =
graph.historyState && typeof graph.historyState === "object"
? graph.historyState
: createDefaultHistoryState(graph?.historyState?.chatId || "");
graph.historyState = historyState;
const safeLastProcessedAssistantFloor = Number.isFinite(
Number(lastProcessedAssistantFloor),
)
? Math.floor(Number(lastProcessedAssistantFloor))
: -1;
historyState.lastProcessedAssistantFloor = safeLastProcessedAssistantFloor;
historyState.processedMessageHashVersion = PROCESSED_MESSAGE_HASH_VERSION;
historyState.processedMessageHashes =
safeLastProcessedAssistantFloor >= 0
? snapshotProcessedMessageHashes(chat, safeLastProcessedAssistantFloor)
: {};
historyState.processedMessageHashesNeedRefresh = false;
graph.lastProcessedSeq = safeLastProcessedAssistantFloor;
return graph;
}
export function rebindProcessedHistoryStateToChat(
graph,
chat,

View File

@@ -12,12 +12,15 @@ function createRuntime(persistResult) {
nodes: [],
edges: [],
historyState: {},
batchJournal: [],
};
let processedHistoryUpdates = 0;
let persistedGraphSnapshot = null;
return {
graph,
processedHistoryUpdates,
persistedGraphSnapshot,
ensureCurrentGraphRuntimeState() {},
throwIfAborted() {},
getCurrentGraph() {
@@ -64,6 +67,7 @@ function createRuntime(persistResult) {
};
},
async persistExtractionBatchResult() {
persistedGraphSnapshot = arguments[0]?.graphSnapshot || null;
return persistResult;
},
finalizeBatchStatus,
@@ -73,13 +77,20 @@ function createRuntime(persistResult) {
updateProcessedHistorySnapshot() {
processedHistoryUpdates += 1;
},
appendBatchJournal() {},
appendBatchJournal(targetGraph, entry) {
if (!targetGraph.batchJournal) targetGraph.batchJournal = [];
targetGraph.batchJournal.push(entry);
},
createBatchJournalEntry() {
return { id: "journal-1" };
return { id: "journal-1", processedRange: [5, 5] };
},
computePostProcessArtifacts() {
return [];
},
applyProcessedHistorySnapshotToGraph(targetGraph, _chat, floor) {
targetGraph.historyState.lastProcessedAssistantFloor = floor;
targetGraph.lastProcessedSeq = floor;
},
getGraphPersistenceState() {
return { chatId: "chat-test" };
},
@@ -87,6 +98,9 @@ function createRuntime(persistResult) {
get processedHistoryUpdates() {
return processedHistoryUpdates;
},
get persistedGraphSnapshot() {
return persistedGraphSnapshot;
},
};
}
@@ -119,6 +133,14 @@ function createRuntime(persistResult) {
runtime.graph.historyState.lastBatchStatus.historyAdvanceAllowed,
false,
);
assert.equal(
runtime.persistedGraphSnapshot?.historyState?.lastProcessedAssistantFloor,
5,
);
assert.equal(
runtime.persistedGraphSnapshot?.batchJournal?.length,
1,
);
}
{
@@ -150,6 +172,14 @@ function createRuntime(persistResult) {
runtime.graph.historyState.lastBatchStatus.historyAdvanceAllowed,
true,
);
assert.equal(
runtime.persistedGraphSnapshot?.historyState?.lastProcessedAssistantFloor,
5,
);
assert.equal(
runtime.persistedGraphSnapshot?.batchJournal?.length,
1,
);
}
console.log("extraction-persistence-gating tests passed");

View File

@@ -2332,6 +2332,16 @@ result = {
historyAdvanceAllowed: false,
historyAdvanced: false,
};
const committedGraph = structuredClone(graph);
committedGraph.historyState.lastProcessedAssistantFloor = 1;
committedGraph.lastProcessedSeq = 1;
committedGraph.batchJournal = [
{
id: "journal-queued-1",
processedRange: [1, 1],
createdAt: Date.now(),
},
];
harness.api.setCurrentGraph(graph);
harness.api.setGraphPersistenceState({
loadState: "loaded",
@@ -2344,6 +2354,14 @@ result = {
pendingPersist: true,
writesBlocked: false,
});
harness.api.writeGraphShadowSnapshot(
"chat-pending-persist-retry",
committedGraph,
{
revision: 7,
reason: "queued-persist-authoritative",
},
);
harness.runtimeContext.__markSyncDirtyShouldThrow = true;
const result = await harness.api.retryPendingGraphPersist({
@@ -2369,6 +2387,15 @@ result = {
harness.api.getCurrentGraph().historyState.lastBatchStatus.persistence.outcome,
"saved",
);
assert.equal(
harness.api.getCurrentGraph().batchJournal?.length,
1,
"pending persist retry 应把 authoritative batch journal 回填到 runtime graph",
);
assert.equal(
harness.api.getCurrentGraph().batchJournal?.[0]?.id,
"journal-queued-1",
);
}
{