mirror of
https://github.com/Youzini-afk/ST-Bionic-Memory-Ecology.git
synced 2026-06-13 18:31:16 +08:00
fix: deep repair p0-p1 persistence runtime merge and integrity
This commit is contained in:
640
bme-sync.js
640
bme-sync.js
@@ -11,6 +11,14 @@ const sanitizedFilenameByChatId = new Map();
|
||||
let visibilitySyncInstalled = false;
|
||||
let lastVisibilityState = "visible";
|
||||
|
||||
const RUNTIME_HISTORY_META_KEY = "runtimeHistoryState";
|
||||
const RUNTIME_VECTOR_META_KEY = "runtimeVectorIndexState";
|
||||
const RUNTIME_BATCH_JOURNAL_META_KEY = "runtimeBatchJournal";
|
||||
const RUNTIME_LAST_RECALL_META_KEY = "runtimeLastRecallResult";
|
||||
const RUNTIME_LAST_PROCESSED_SEQ_META_KEY = "runtimeLastProcessedSeq";
|
||||
const RUNTIME_GRAPH_VERSION_META_KEY = "runtimeGraphVersion";
|
||||
const RUNTIME_BATCH_JOURNAL_LIMIT = 96;
|
||||
|
||||
function normalizeChatId(chatId) {
|
||||
return String(chatId ?? "").trim();
|
||||
}
|
||||
@@ -280,6 +288,523 @@ function mergeRecordCollectionById(localRecords = [], remoteRecords = []) {
|
||||
return Array.from(mergedById.values());
|
||||
}
|
||||
|
||||
function normalizeNonNegativeInteger(value, fallback = 0) {
|
||||
const parsed = Number(value);
|
||||
if (!Number.isFinite(parsed) || parsed < 0) {
|
||||
return Math.max(0, Math.floor(Number(fallback) || 0));
|
||||
}
|
||||
return Math.floor(parsed);
|
||||
}
|
||||
|
||||
function normalizeOptionalFloor(value) {
|
||||
const parsed = Number(value);
|
||||
if (!Number.isFinite(parsed)) return null;
|
||||
return Math.max(0, Math.floor(parsed));
|
||||
}
|
||||
|
||||
function normalizeStringMap(record = {}) {
|
||||
if (!record || typeof record !== "object" || Array.isArray(record)) {
|
||||
return {};
|
||||
}
|
||||
|
||||
const normalized = {};
|
||||
for (const [key, value] of Object.entries(record)) {
|
||||
const normalizedKey = String(key || "").trim();
|
||||
const normalizedValue = String(value || "").trim();
|
||||
if (!normalizedKey || !normalizedValue) continue;
|
||||
normalized[normalizedKey] = normalizedValue;
|
||||
}
|
||||
return normalized;
|
||||
}
|
||||
|
||||
function normalizeProcessedMessageHashes(record = {}) {
|
||||
if (!record || typeof record !== "object" || Array.isArray(record)) {
|
||||
return {};
|
||||
}
|
||||
|
||||
const normalized = {};
|
||||
for (const [floorKey, hashValue] of Object.entries(record)) {
|
||||
const floor = Number.parseInt(floorKey, 10);
|
||||
const normalizedHash = String(hashValue || "").trim();
|
||||
if (!Number.isFinite(floor) || floor < 0 || !normalizedHash) continue;
|
||||
normalized[String(floor)] = normalizedHash;
|
||||
}
|
||||
return normalized;
|
||||
}
|
||||
|
||||
function sortProcessedMessageHashes(record = {}) {
|
||||
const sorted = {};
|
||||
const keys = Object.keys(record)
|
||||
.map((value) => Number.parseInt(value, 10))
|
||||
.filter(Number.isFinite)
|
||||
.sort((left, right) => left - right);
|
||||
|
||||
for (const key of keys) {
|
||||
sorted[String(key)] = record[String(key)];
|
||||
}
|
||||
return sorted;
|
||||
}
|
||||
|
||||
function normalizeStringArray(value) {
|
||||
return Array.isArray(value)
|
||||
? [...new Set(value.map((item) => String(item || "").trim()).filter(Boolean))]
|
||||
: [];
|
||||
}
|
||||
|
||||
function stableSerialize(value) {
|
||||
try {
|
||||
return JSON.stringify(value);
|
||||
} catch {
|
||||
return "";
|
||||
}
|
||||
}
|
||||
|
||||
function readRuntimeTimestamp(value) {
|
||||
if (!value || typeof value !== "object" || Array.isArray(value)) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
const candidates = [
|
||||
value.updatedAt,
|
||||
value.at,
|
||||
value.createdAt,
|
||||
value.completedAt,
|
||||
value.lastUpdatedAt,
|
||||
value.timestamp,
|
||||
];
|
||||
|
||||
for (const candidate of candidates) {
|
||||
const parsed = Number(candidate);
|
||||
if (Number.isFinite(parsed) && parsed > 0) {
|
||||
return Math.floor(parsed);
|
||||
}
|
||||
|
||||
if (typeof candidate === "string") {
|
||||
const dateValue = Date.parse(candidate);
|
||||
if (Number.isFinite(dateValue) && dateValue > 0) {
|
||||
return Math.floor(dateValue);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
function chooseNewerRuntimePayload(localValue, remoteValue) {
|
||||
const local = toSerializableData(localValue, null);
|
||||
const remote = toSerializableData(remoteValue, null);
|
||||
|
||||
if (local == null) return remote;
|
||||
if (remote == null) return local;
|
||||
|
||||
if (stableSerialize(local) === stableSerialize(remote)) {
|
||||
return local;
|
||||
}
|
||||
|
||||
const localTimestamp = readRuntimeTimestamp(local);
|
||||
const remoteTimestamp = readRuntimeTimestamp(remote);
|
||||
if (remoteTimestamp > localTimestamp) return remote;
|
||||
if (localTimestamp > remoteTimestamp) return local;
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
function pickMinFinite(values = [], fallbackValue = null) {
|
||||
const normalized = values.filter(Number.isFinite);
|
||||
if (!normalized.length) return fallbackValue;
|
||||
return Math.min(...normalized);
|
||||
}
|
||||
|
||||
function normalizeRuntimeHistoryMeta(value = {}, fallbackChatId = "") {
|
||||
const input =
|
||||
value && typeof value === "object" && !Array.isArray(value)
|
||||
? toSerializableData(value, {})
|
||||
: {};
|
||||
|
||||
return {
|
||||
...input,
|
||||
chatId: normalizeChatId(input.chatId || fallbackChatId),
|
||||
lastProcessedAssistantFloor: Number.isFinite(Number(input.lastProcessedAssistantFloor))
|
||||
? Math.floor(Number(input.lastProcessedAssistantFloor))
|
||||
: -1,
|
||||
extractionCount: normalizeNonNegativeInteger(input.extractionCount, 0),
|
||||
processedMessageHashes: normalizeProcessedMessageHashes(input.processedMessageHashes),
|
||||
historyDirtyFrom: normalizeOptionalFloor(input.historyDirtyFrom),
|
||||
lastMutationReason:
|
||||
typeof input.lastMutationReason === "string" ? input.lastMutationReason : "",
|
||||
lastMutationSource:
|
||||
typeof input.lastMutationSource === "string" ? input.lastMutationSource : "",
|
||||
lastRecoveryResult: toSerializableData(input.lastRecoveryResult, null),
|
||||
lastBatchStatus: toSerializableData(input.lastBatchStatus, null),
|
||||
};
|
||||
}
|
||||
|
||||
function mergeRuntimeHistoryMeta(localMeta = {}, remoteMeta = {}, options = {}) {
|
||||
const localHistory = normalizeRuntimeHistoryMeta(localMeta, options.chatId);
|
||||
const remoteHistory = normalizeRuntimeHistoryMeta(remoteMeta, options.chatId);
|
||||
|
||||
const fallbackLastProcessedFloor = Number.isFinite(Number(options.fallbackLastProcessedFloor))
|
||||
? Math.floor(Number(options.fallbackLastProcessedFloor))
|
||||
: -1;
|
||||
const fallbackExtractionCount = normalizeNonNegativeInteger(options.fallbackExtractionCount, 0);
|
||||
|
||||
const baseLastProcessedFloor = Math.max(
|
||||
localHistory.lastProcessedAssistantFloor,
|
||||
remoteHistory.lastProcessedAssistantFloor,
|
||||
fallbackLastProcessedFloor,
|
||||
);
|
||||
|
||||
const mergedHashes = {};
|
||||
const conflictFloors = [];
|
||||
const floorSet = new Set([
|
||||
...Object.keys(localHistory.processedMessageHashes),
|
||||
...Object.keys(remoteHistory.processedMessageHashes),
|
||||
]);
|
||||
const sortedFloors = Array.from(floorSet)
|
||||
.map((value) => Number.parseInt(value, 10))
|
||||
.filter(Number.isFinite)
|
||||
.sort((left, right) => left - right);
|
||||
|
||||
for (const floor of sortedFloors) {
|
||||
const floorKey = String(floor);
|
||||
const localHash = localHistory.processedMessageHashes[floorKey];
|
||||
const remoteHash = remoteHistory.processedMessageHashes[floorKey];
|
||||
if (localHash && remoteHash && localHash !== remoteHash) {
|
||||
conflictFloors.push(floor);
|
||||
continue;
|
||||
}
|
||||
if (localHash || remoteHash) {
|
||||
mergedHashes[floorKey] = localHash || remoteHash;
|
||||
}
|
||||
}
|
||||
|
||||
let safeLastProcessedFloor = baseLastProcessedFloor;
|
||||
const hasIntegrityConflict = conflictFloors.length > 0;
|
||||
if (hasIntegrityConflict) {
|
||||
const highestConflictFreeFloor = sortedFloors.length
|
||||
? sortedFloors[sortedFloors.length - 1]
|
||||
: -1;
|
||||
const firstConflictFloor = Math.min(...conflictFloors);
|
||||
safeLastProcessedFloor = Math.min(
|
||||
baseLastProcessedFloor,
|
||||
highestConflictFreeFloor,
|
||||
firstConflictFloor - 1,
|
||||
);
|
||||
}
|
||||
safeLastProcessedFloor = Math.max(-1, safeLastProcessedFloor);
|
||||
|
||||
const historyDirtyFrom = pickMinFinite(
|
||||
[
|
||||
localHistory.historyDirtyFrom,
|
||||
remoteHistory.historyDirtyFrom,
|
||||
hasIntegrityConflict ? Math.max(0, safeLastProcessedFloor + 1) : null,
|
||||
],
|
||||
null,
|
||||
);
|
||||
|
||||
const firstConflictFloor = hasIntegrityConflict ? Math.min(...conflictFloors) : null;
|
||||
const mergedHistory = {
|
||||
...localHistory,
|
||||
...remoteHistory,
|
||||
chatId: normalizeChatId(remoteHistory.chatId || localHistory.chatId || options.chatId),
|
||||
lastProcessedAssistantFloor: safeLastProcessedFloor,
|
||||
extractionCount: Math.max(
|
||||
localHistory.extractionCount,
|
||||
remoteHistory.extractionCount,
|
||||
fallbackExtractionCount,
|
||||
),
|
||||
processedMessageHashes: sortProcessedMessageHashes(mergedHashes),
|
||||
historyDirtyFrom,
|
||||
lastMutationReason: hasIntegrityConflict
|
||||
? `sync-merge:processed-hash-conflict@${firstConflictFloor}`
|
||||
: String(remoteHistory.lastMutationReason || localHistory.lastMutationReason || ""),
|
||||
lastMutationSource: hasIntegrityConflict
|
||||
? "sync-merge"
|
||||
: String(remoteHistory.lastMutationSource || localHistory.lastMutationSource || ""),
|
||||
lastRecoveryResult: chooseNewerRuntimePayload(
|
||||
localHistory.lastRecoveryResult,
|
||||
remoteHistory.lastRecoveryResult,
|
||||
),
|
||||
lastBatchStatus: chooseNewerRuntimePayload(
|
||||
localHistory.lastBatchStatus,
|
||||
remoteHistory.lastBatchStatus,
|
||||
),
|
||||
};
|
||||
|
||||
return {
|
||||
history: mergedHistory,
|
||||
hasIntegrityConflict,
|
||||
safeLastProcessedFloor,
|
||||
conflictFloors,
|
||||
};
|
||||
}
|
||||
|
||||
function normalizeRuntimeVectorMeta(value = {}) {
|
||||
const input =
|
||||
value && typeof value === "object" && !Array.isArray(value)
|
||||
? toSerializableData(value, {})
|
||||
: {};
|
||||
|
||||
const localStats =
|
||||
input.lastStats && typeof input.lastStats === "object" && !Array.isArray(input.lastStats)
|
||||
? input.lastStats
|
||||
: {};
|
||||
|
||||
return {
|
||||
...input,
|
||||
mode: typeof input.mode === "string" ? input.mode : "",
|
||||
collectionId: typeof input.collectionId === "string" ? input.collectionId : "",
|
||||
source: typeof input.source === "string" ? input.source : "",
|
||||
modelScope: typeof input.modelScope === "string" ? input.modelScope : "",
|
||||
hashToNodeId: normalizeStringMap(input.hashToNodeId),
|
||||
nodeToHash: normalizeStringMap(input.nodeToHash),
|
||||
dirty: Boolean(input.dirty),
|
||||
replayRequiredNodeIds: normalizeStringArray(input.replayRequiredNodeIds),
|
||||
dirtyReason: typeof input.dirtyReason === "string" ? input.dirtyReason : "",
|
||||
pendingRepairFromFloor: normalizeOptionalFloor(input.pendingRepairFromFloor),
|
||||
lastSyncAt: normalizeTimestamp(input.lastSyncAt, 0),
|
||||
lastStats: {
|
||||
total: normalizeNonNegativeInteger(localStats.total, 0),
|
||||
indexed: normalizeNonNegativeInteger(localStats.indexed, 0),
|
||||
stale: normalizeNonNegativeInteger(localStats.stale, 0),
|
||||
pending: normalizeNonNegativeInteger(localStats.pending, 0),
|
||||
},
|
||||
lastWarning: typeof input.lastWarning === "string" ? input.lastWarning : "",
|
||||
};
|
||||
}
|
||||
|
||||
function mergeRuntimeVectorMeta(localMeta = {}, remoteMeta = {}, options = {}) {
|
||||
const localVector = normalizeRuntimeVectorMeta(localMeta);
|
||||
const remoteVector = normalizeRuntimeVectorMeta(remoteMeta);
|
||||
|
||||
const aliveNodeIds = new Set(
|
||||
(Array.isArray(options.mergedNodes) ? options.mergedNodes : [])
|
||||
.map((node) => String(node?.id || "").trim())
|
||||
.filter(Boolean),
|
||||
);
|
||||
|
||||
const conflictNodeIds = new Set();
|
||||
const candidateHashByNode = new Map();
|
||||
const registerCandidate = (nodeId, hash) => {
|
||||
const normalizedNodeId = String(nodeId || "").trim();
|
||||
const normalizedHash = String(hash || "").trim();
|
||||
if (!normalizedNodeId || !normalizedHash || !aliveNodeIds.has(normalizedNodeId)) return;
|
||||
if (conflictNodeIds.has(normalizedNodeId)) return;
|
||||
const existingHash = candidateHashByNode.get(normalizedNodeId);
|
||||
if (!existingHash) {
|
||||
candidateHashByNode.set(normalizedNodeId, normalizedHash);
|
||||
return;
|
||||
}
|
||||
if (existingHash !== normalizedHash) {
|
||||
conflictNodeIds.add(normalizedNodeId);
|
||||
candidateHashByNode.delete(normalizedNodeId);
|
||||
}
|
||||
};
|
||||
|
||||
for (const [nodeId, hash] of Object.entries(localVector.nodeToHash)) {
|
||||
registerCandidate(nodeId, hash);
|
||||
}
|
||||
for (const [nodeId, hash] of Object.entries(remoteVector.nodeToHash)) {
|
||||
registerCandidate(nodeId, hash);
|
||||
}
|
||||
for (const [hash, nodeId] of Object.entries(localVector.hashToNodeId)) {
|
||||
registerCandidate(nodeId, hash);
|
||||
}
|
||||
for (const [hash, nodeId] of Object.entries(remoteVector.hashToNodeId)) {
|
||||
registerCandidate(nodeId, hash);
|
||||
}
|
||||
|
||||
for (const nodeId of conflictNodeIds) {
|
||||
candidateHashByNode.delete(nodeId);
|
||||
}
|
||||
|
||||
const hashBuckets = new Map();
|
||||
for (const [nodeId, hash] of candidateHashByNode.entries()) {
|
||||
const bucket = hashBuckets.get(hash) || new Set();
|
||||
bucket.add(nodeId);
|
||||
hashBuckets.set(hash, bucket);
|
||||
}
|
||||
|
||||
const mergedNodeToHash = {};
|
||||
const mergedHashToNodeId = {};
|
||||
for (const [hash, bucket] of hashBuckets.entries()) {
|
||||
const nodeIds = Array.from(bucket).filter((nodeId) => aliveNodeIds.has(nodeId));
|
||||
if (nodeIds.length !== 1) {
|
||||
for (const nodeId of nodeIds) {
|
||||
conflictNodeIds.add(nodeId);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
const nodeId = nodeIds[0];
|
||||
mergedNodeToHash[nodeId] = hash;
|
||||
mergedHashToNodeId[hash] = nodeId;
|
||||
}
|
||||
|
||||
const replayRequiredNodeIds = normalizeStringArray([
|
||||
...localVector.replayRequiredNodeIds,
|
||||
...remoteVector.replayRequiredNodeIds,
|
||||
...Array.from(conflictNodeIds),
|
||||
]).filter((nodeId) => aliveNodeIds.has(nodeId));
|
||||
|
||||
const hasMappingConflict = conflictNodeIds.size > 0;
|
||||
const inheritedDirty = Boolean(localVector.dirty || remoteVector.dirty);
|
||||
const dirty = inheritedDirty || hasMappingConflict || replayRequiredNodeIds.length > 0;
|
||||
const fallbackRepairFloor = Number.isFinite(Number(options.fallbackLastProcessedFloor))
|
||||
? Math.max(0, Math.floor(Number(options.fallbackLastProcessedFloor)))
|
||||
: 0;
|
||||
|
||||
const pendingRepairFromFloor = dirty
|
||||
? pickMinFinite(
|
||||
[
|
||||
localVector.pendingRepairFromFloor,
|
||||
remoteVector.pendingRepairFromFloor,
|
||||
hasMappingConflict ? fallbackRepairFloor : null,
|
||||
],
|
||||
null,
|
||||
)
|
||||
: null;
|
||||
|
||||
const mappingCount = Object.keys(mergedNodeToHash).length;
|
||||
const total = Math.max(mappingCount, localVector.lastStats.total, remoteVector.lastStats.total);
|
||||
const indexed = mappingCount;
|
||||
const stale = Math.max(0, total - indexed);
|
||||
const pending = dirty
|
||||
? Math.max(
|
||||
replayRequiredNodeIds.length,
|
||||
localVector.lastStats.pending,
|
||||
remoteVector.lastStats.pending,
|
||||
hasMappingConflict ? 1 : 0,
|
||||
)
|
||||
: 0;
|
||||
|
||||
return {
|
||||
...localVector,
|
||||
...remoteVector,
|
||||
mode: String(remoteVector.mode || localVector.mode || "").trim(),
|
||||
source: String(remoteVector.source || localVector.source || "").trim(),
|
||||
modelScope: String(remoteVector.modelScope || localVector.modelScope || "").trim(),
|
||||
collectionId: String(remoteVector.collectionId || localVector.collectionId || "").trim(),
|
||||
hashToNodeId: mergedHashToNodeId,
|
||||
nodeToHash: mergedNodeToHash,
|
||||
replayRequiredNodeIds,
|
||||
dirty,
|
||||
dirtyReason: hasMappingConflict
|
||||
? "sync-merge-vector-conflict"
|
||||
: dirty
|
||||
? String(
|
||||
remoteVector.dirtyReason ||
|
||||
localVector.dirtyReason ||
|
||||
"sync-merge-vector-replay-required",
|
||||
)
|
||||
: "",
|
||||
pendingRepairFromFloor,
|
||||
lastSyncAt: Math.max(localVector.lastSyncAt, remoteVector.lastSyncAt),
|
||||
lastStats: {
|
||||
total,
|
||||
indexed,
|
||||
stale,
|
||||
pending,
|
||||
},
|
||||
lastWarning: hasMappingConflict
|
||||
? "同步合并检测到向量映射冲突,已标记待重建"
|
||||
: String(remoteVector.lastWarning || localVector.lastWarning || ""),
|
||||
};
|
||||
}
|
||||
|
||||
function normalizeJournalEntry(entry) {
|
||||
if (!entry || typeof entry !== "object" || Array.isArray(entry)) return null;
|
||||
const normalizedId = String(entry.id || "").trim();
|
||||
if (!normalizedId) return null;
|
||||
|
||||
const range = Array.isArray(entry.processedRange) ? entry.processedRange : [];
|
||||
const rangeStart = Number(range[0]);
|
||||
const rangeEnd = Number(range[1]);
|
||||
if (!Number.isFinite(rangeStart) || !Number.isFinite(rangeEnd) || rangeStart > rangeEnd) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return {
|
||||
...toSerializableData(entry, entry),
|
||||
id: normalizedId,
|
||||
createdAt: normalizeTimestamp(entry.createdAt ?? entry.at, 0),
|
||||
processedRange: [Math.floor(rangeStart), Math.floor(rangeEnd)],
|
||||
};
|
||||
}
|
||||
|
||||
function chooseJournalEntryWinner(localEntry, remoteEntry) {
|
||||
if (!localEntry) return remoteEntry || null;
|
||||
if (!remoteEntry) return localEntry || null;
|
||||
|
||||
if (remoteEntry.createdAt > localEntry.createdAt) return remoteEntry;
|
||||
if (localEntry.createdAt > remoteEntry.createdAt) return localEntry;
|
||||
|
||||
const localEnd = Number(localEntry.processedRange?.[1] ?? -1);
|
||||
const remoteEnd = Number(remoteEntry.processedRange?.[1] ?? -1);
|
||||
if (remoteEnd > localEnd) return remoteEntry;
|
||||
if (localEnd > remoteEnd) return localEntry;
|
||||
return remoteEntry;
|
||||
}
|
||||
|
||||
function mergeRuntimeBatchJournal(localJournal = [], remoteJournal = [], options = {}) {
|
||||
const journalById = new Map();
|
||||
const register = (entry) => {
|
||||
const normalizedEntry = normalizeJournalEntry(entry);
|
||||
if (!normalizedEntry) return;
|
||||
const existing = journalById.get(normalizedEntry.id);
|
||||
const winner = chooseJournalEntryWinner(existing, normalizedEntry);
|
||||
if (winner) journalById.set(normalizedEntry.id, winner);
|
||||
};
|
||||
|
||||
for (const entry of Array.isArray(localJournal) ? localJournal : []) {
|
||||
register(entry);
|
||||
}
|
||||
for (const entry of Array.isArray(remoteJournal) ? remoteJournal : []) {
|
||||
register(entry);
|
||||
}
|
||||
|
||||
let merged = Array.from(journalById.values());
|
||||
const maxTrustedFloor = Number.isFinite(Number(options.maxTrustedFloor))
|
||||
? Math.floor(Number(options.maxTrustedFloor))
|
||||
: null;
|
||||
if (Number.isFinite(maxTrustedFloor)) {
|
||||
merged = merged.filter((entry) => Number(entry.processedRange?.[1]) <= maxTrustedFloor);
|
||||
}
|
||||
|
||||
merged.sort((left, right) => {
|
||||
const leftStart = Number(left.processedRange?.[0] ?? -1);
|
||||
const rightStart = Number(right.processedRange?.[0] ?? -1);
|
||||
const leftEnd = Number(left.processedRange?.[1] ?? -1);
|
||||
const rightEnd = Number(right.processedRange?.[1] ?? -1);
|
||||
return (
|
||||
leftStart - rightStart ||
|
||||
leftEnd - rightEnd ||
|
||||
left.createdAt - right.createdAt ||
|
||||
left.id.localeCompare(right.id)
|
||||
);
|
||||
});
|
||||
|
||||
if (merged.length > RUNTIME_BATCH_JOURNAL_LIMIT) {
|
||||
merged = merged.slice(-RUNTIME_BATCH_JOURNAL_LIMIT);
|
||||
}
|
||||
|
||||
return merged.map((entry) => toSerializableData(entry, entry));
|
||||
}
|
||||
|
||||
function mergeRuntimeLastRecallResult(localSnapshot, remoteSnapshot) {
|
||||
const localRecall = toSerializableData(localSnapshot?.meta?.[RUNTIME_LAST_RECALL_META_KEY], null);
|
||||
const remoteRecall = toSerializableData(remoteSnapshot?.meta?.[RUNTIME_LAST_RECALL_META_KEY], null);
|
||||
const mergedByPayload = chooseNewerRuntimePayload(localRecall, remoteRecall);
|
||||
if (mergedByPayload != null) {
|
||||
return mergedByPayload;
|
||||
}
|
||||
|
||||
const localModified = normalizeTimestamp(localSnapshot?.meta?.lastModified, 0);
|
||||
const remoteModified = normalizeTimestamp(remoteSnapshot?.meta?.lastModified, 0);
|
||||
if (remoteModified > localModified) return remoteRecall;
|
||||
if (localModified > remoteModified) return localRecall;
|
||||
return null;
|
||||
}
|
||||
|
||||
async function getDb(chatId, options = {}) {
|
||||
const normalizedChatId = normalizeChatId(chatId);
|
||||
if (!normalizedChatId) {
|
||||
@@ -312,6 +837,24 @@ async function patchDbMeta(db, patch = {}) {
|
||||
}
|
||||
}
|
||||
|
||||
async function invokeSyncAppliedHook(options = {}, payload = {}) {
|
||||
if (typeof options.onSyncApplied !== "function") {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
await options.onSyncApplied({
|
||||
...(payload || {}),
|
||||
});
|
||||
} catch (error) {
|
||||
console.warn("[ST-BME] 同步后运行时刷新回调失败:", {
|
||||
chatId: String(payload?.chatId || ""),
|
||||
action: String(payload?.action || ""),
|
||||
error,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
async function sanitizeFilename(fileName, options = {}) {
|
||||
const fallbackSanitized = String(fileName || "")
|
||||
.replace(/[<>:"/\\|?*\x00-\x1F]/g, "_")
|
||||
@@ -675,6 +1218,12 @@ export async function download(chatId, options = {}) {
|
||||
syncDirtyReason: "",
|
||||
});
|
||||
|
||||
await invokeSyncAppliedHook(options, {
|
||||
chatId: normalizedChatId,
|
||||
action: "download",
|
||||
revision: remoteRevision,
|
||||
});
|
||||
|
||||
return {
|
||||
downloaded: true,
|
||||
exists: true,
|
||||
@@ -720,7 +1269,7 @@ export function mergeSnapshots(localSnapshot, remoteSnapshot, options = {}) {
|
||||
const remoteRevision = normalizeRevision(remote.meta.revision);
|
||||
const mergedRevision = Math.max(localRevision, remoteRevision) + 1;
|
||||
|
||||
const mergedState = {
|
||||
const baseMergedState = {
|
||||
lastProcessedFloor: Math.max(
|
||||
Number(local.state?.lastProcessedFloor ?? -1),
|
||||
Number(remote.state?.lastProcessedFloor ?? -1),
|
||||
@@ -731,9 +1280,88 @@ export function mergeSnapshots(localSnapshot, remoteSnapshot, options = {}) {
|
||||
),
|
||||
};
|
||||
|
||||
const mergedHistoryResult = mergeRuntimeHistoryMeta(
|
||||
local.meta?.[RUNTIME_HISTORY_META_KEY],
|
||||
remote.meta?.[RUNTIME_HISTORY_META_KEY],
|
||||
{
|
||||
chatId: normalizedChatId,
|
||||
fallbackLastProcessedFloor: baseMergedState.lastProcessedFloor,
|
||||
fallbackExtractionCount: baseMergedState.extractionCount,
|
||||
},
|
||||
);
|
||||
|
||||
const mergedLastProcessedFloor = Math.min(
|
||||
Number(baseMergedState.lastProcessedFloor ?? -1),
|
||||
Number(mergedHistoryResult.safeLastProcessedFloor ?? -1),
|
||||
);
|
||||
|
||||
const mergedState = {
|
||||
lastProcessedFloor: Number.isFinite(mergedLastProcessedFloor)
|
||||
? Math.floor(mergedLastProcessedFloor)
|
||||
: -1,
|
||||
extractionCount: Math.max(
|
||||
Number(baseMergedState.extractionCount ?? 0),
|
||||
Number(mergedHistoryResult.history?.extractionCount ?? 0),
|
||||
),
|
||||
};
|
||||
|
||||
const mergedHistoryState = {
|
||||
...mergedHistoryResult.history,
|
||||
chatId: normalizedChatId,
|
||||
lastProcessedAssistantFloor: mergedState.lastProcessedFloor,
|
||||
extractionCount: mergedState.extractionCount,
|
||||
processedMessageHashes: sortProcessedMessageHashes(
|
||||
Object.fromEntries(
|
||||
Object.entries(mergedHistoryResult.history?.processedMessageHashes || {}).filter(
|
||||
([floorKey]) => {
|
||||
const floor = Number.parseInt(floorKey, 10);
|
||||
return Number.isFinite(floor) && floor >= 0 && floor <= mergedState.lastProcessedFloor;
|
||||
},
|
||||
),
|
||||
),
|
||||
),
|
||||
};
|
||||
|
||||
const mergedVectorState = mergeRuntimeVectorMeta(
|
||||
local.meta?.[RUNTIME_VECTOR_META_KEY],
|
||||
remote.meta?.[RUNTIME_VECTOR_META_KEY],
|
||||
{
|
||||
mergedNodes,
|
||||
fallbackLastProcessedFloor: mergedState.lastProcessedFloor,
|
||||
},
|
||||
);
|
||||
|
||||
const mergedBatchJournal = mergeRuntimeBatchJournal(
|
||||
local.meta?.[RUNTIME_BATCH_JOURNAL_META_KEY],
|
||||
remote.meta?.[RUNTIME_BATCH_JOURNAL_META_KEY],
|
||||
{
|
||||
maxTrustedFloor: mergedState.lastProcessedFloor,
|
||||
},
|
||||
);
|
||||
|
||||
const mergedLastRecallResult = mergeRuntimeLastRecallResult(local, remote);
|
||||
|
||||
const mergedLastProcessedSeq = Math.max(
|
||||
normalizeNonNegativeInteger(local.meta?.[RUNTIME_LAST_PROCESSED_SEQ_META_KEY], 0),
|
||||
normalizeNonNegativeInteger(remote.meta?.[RUNTIME_LAST_PROCESSED_SEQ_META_KEY], 0),
|
||||
normalizeNonNegativeInteger(mergedState.lastProcessedFloor, 0),
|
||||
);
|
||||
|
||||
const mergedRuntimeGraphVersion = Math.max(
|
||||
normalizeNonNegativeInteger(local.meta?.[RUNTIME_GRAPH_VERSION_META_KEY], 0),
|
||||
normalizeNonNegativeInteger(remote.meta?.[RUNTIME_GRAPH_VERSION_META_KEY], 0),
|
||||
normalizeNonNegativeInteger(mergedRevision, 0),
|
||||
);
|
||||
|
||||
const mergedMeta = {
|
||||
...local.meta,
|
||||
...remote.meta,
|
||||
[RUNTIME_HISTORY_META_KEY]: mergedHistoryState,
|
||||
[RUNTIME_VECTOR_META_KEY]: mergedVectorState,
|
||||
[RUNTIME_BATCH_JOURNAL_META_KEY]: mergedBatchJournal,
|
||||
[RUNTIME_LAST_RECALL_META_KEY]: mergedLastRecallResult,
|
||||
[RUNTIME_LAST_PROCESSED_SEQ_META_KEY]: mergedLastProcessedSeq,
|
||||
[RUNTIME_GRAPH_VERSION_META_KEY]: mergedRuntimeGraphVersion,
|
||||
schemaVersion: Math.max(
|
||||
Number(local.meta?.schemaVersion || 1),
|
||||
Number(remote.meta?.schemaVersion || 1),
|
||||
@@ -749,6 +1377,10 @@ export function mergeSnapshots(localSnapshot, remoteSnapshot, options = {}) {
|
||||
nodeCount: mergedNodes.length,
|
||||
edgeCount: mergedEdges.length,
|
||||
tombstoneCount: mergedTombstones.length,
|
||||
syncDirty: false,
|
||||
syncDirtyReason: "",
|
||||
lastProcessedFloor: mergedState.lastProcessedFloor,
|
||||
extractionCount: mergedState.extractionCount,
|
||||
};
|
||||
|
||||
return {
|
||||
@@ -858,6 +1490,12 @@ export async function syncNow(chatId, options = {}) {
|
||||
syncDirtyReason: "",
|
||||
});
|
||||
|
||||
await invokeSyncAppliedHook(options, {
|
||||
chatId: normalizedChatId,
|
||||
action: "merge",
|
||||
revision: normalizeRevision(mergedSnapshot.meta.revision),
|
||||
});
|
||||
|
||||
return {
|
||||
synced: true,
|
||||
chatId: normalizedChatId,
|
||||
|
||||
Reference in New Issue
Block a user