From dba53cc21c210c2c394d3936f2a546a9e210be51 Mon Sep 17 00:00:00 2001 From: youzini Date: Sat, 6 Jun 2026 09:36:41 +0000 Subject: [PATCH] fix(sync): defer cleanup of stale remote chunks --- sync/bme-sync.js | 198 +++++++++++++++++++++++++++++++++ tests/indexeddb-sync.mjs | 232 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 430 insertions(+) diff --git a/sync/bme-sync.js b/sync/bme-sync.js index e72c628..ccd3c88 100644 --- a/sync/bme-sync.js +++ b/sync/bme-sync.js @@ -12,6 +12,8 @@ const BME_REMOTE_SYNC_FORMAT_VERSION_V2 = 2; const BME_REMOTE_SYNC_NODE_CHUNK_SIZE = 2000; const BME_REMOTE_SYNC_EDGE_CHUNK_SIZE = 4000; const BME_REMOTE_SYNC_TOMBSTONE_CHUNK_SIZE = 2000; +const BME_REMOTE_SYNC_CHUNK_GC_GRACE_MS = 24 * 60 * 60 * 1000; +const BME_REMOTE_SYNC_CHUNK_GC_MAX_PENDING = 512; const BME_BACKUP_FILE_PREFIX = "ST-BME_backup_"; const BME_BACKUP_MANIFEST_FILENAME = "ST-BME_BackupManifest.json"; const BME_BACKUP_SCHEMA_VERSION = 1; @@ -1318,6 +1320,182 @@ function buildRemoteChunkFilename(baseFilename, kind, index, payload) { return `${normalizedBase}.__${normalizedKind}.${String(index).padStart(3, "0")}.${hash}.json`; } +function isRemoteSyncChunkFilenameForBase(filename = "", baseFilename = "") { + const normalizedFilename = normalizeRemoteFileName(filename); + const normalizedBase = normalizeRemoteFileName(baseFilename).replace(/\.json$/i, ""); + if (!normalizedFilename || !normalizedBase) return false; + const escapedBase = normalizedBase.replace(/[.*+?^${}()|[\]\\]/g, "\\$&"); + return new RegExp( + `^${escapedBase}\\.__(nodes|edges|tombstones|runtime-meta)\\.\\d{3}\\.[A-Za-z0-9]+\\.json$`, + ).test(normalizedFilename); +} + +function collectRemoteSyncChunkFilenames(manifest = {}, baseFilename = "") { + if (Number(manifest?.formatVersion || 0) !== BME_REMOTE_SYNC_FORMAT_VERSION_V2) { + return new Set(); + } + const filenames = new Set(); + for (const chunk of Array.isArray(manifest?.chunks) ? manifest.chunks : []) { + const filename = resolveRemoteFileName(chunk?.filename || ""); + if (!isRemoteSyncChunkFilenameForBase(filename, baseFilename)) continue; + filenames.add(filename); + } + return filenames; +} + +async function readPreviousRemoteSyncManifest(filename = "", options = {}) { + const result = await readRemoteJsonFileResult(filename, options); + if (result.status === 404) return null; + if (!result.ok) { + console.warn("[ST-BME] 读取旧同步 manifest 失败,跳过旧 chunk 清理:", result.reason || result.error || result.status); + return null; + } + if (Number(result.payload?.formatVersion || 0) !== BME_REMOTE_SYNC_FORMAT_VERSION_V2) { + return null; + } + return result.payload; +} + +function normalizeRemoteSyncChunkGcPendingEntry(entry = {}, baseFilename = "") { + const filename = resolveRemoteFileName(entry?.filename || ""); + if (!isRemoteSyncChunkFilenameForBase(filename, baseFilename)) return null; + const firstSeenAt = normalizeTimestamp(entry?.firstSeenAt, Date.now()); + const eligibleAt = normalizeTimestamp(entry?.eligibleAt, firstSeenAt + BME_REMOTE_SYNC_CHUNK_GC_GRACE_MS); + return { + filename, + firstSeenAt, + eligibleAt, + sourceRevision: normalizeRevision(entry?.sourceRevision), + }; +} + +function readRemoteSyncChunkGcPending(manifest = {}, baseFilename = "") { + const rawPending = Array.isArray(manifest?.chunkGc?.pending) + ? manifest.chunkGc.pending + : []; + const pendingByFilename = new Map(); + for (const entry of rawPending) { + const normalized = normalizeRemoteSyncChunkGcPendingEntry(entry, baseFilename); + if (!normalized) continue; + const existing = pendingByFilename.get(normalized.filename); + if (!existing || normalized.firstSeenAt < existing.firstSeenAt) { + pendingByFilename.set(normalized.filename, normalized); + } + } + return pendingByFilename; +} + +function buildRemoteSyncChunkGcState( + previousManifest = null, + nextManifest = null, + baseFilename = "", + options = {}, +) { + if (Number(nextManifest?.formatVersion || 0) !== BME_REMOTE_SYNC_FORMAT_VERSION_V2) return null; + + const nowMs = normalizeTimestamp(options.nowMs ?? options.currentTimeMs, Date.now()); + const graceMs = Math.max( + 0, + Math.floor(Number(options.remoteSyncChunkGcGraceMs ?? BME_REMOTE_SYNC_CHUNK_GC_GRACE_MS) || 0), + ); + const nextChunks = collectRemoteSyncChunkFilenames(nextManifest, baseFilename); + const pendingByFilename = readRemoteSyncChunkGcPending(previousManifest, baseFilename); + for (const filename of nextChunks) { + pendingByFilename.delete(filename); + } + + const previousChunks = collectRemoteSyncChunkFilenames(previousManifest, baseFilename); + const previousRevision = normalizeRevision(previousManifest?.meta?.revision); + for (const filename of previousChunks) { + if (nextChunks.has(filename) || pendingByFilename.has(filename)) continue; + pendingByFilename.set(filename, { + filename, + firstSeenAt: nowMs, + eligibleAt: nowMs + graceMs, + sourceRevision: previousRevision, + }); + } + + const pending = [...pendingByFilename.values()] + .filter((entry) => !nextChunks.has(entry.filename)) + .sort((left, right) => left.eligibleAt - right.eligibleAt || left.filename.localeCompare(right.filename)) + .slice(0, BME_REMOTE_SYNC_CHUNK_GC_MAX_PENDING); + + return { + version: 1, + updatedAt: nowMs, + graceMs, + pending, + }; +} + +function areRemoteSyncManifestsEquivalent(left = {}, right = {}) { + return stableSerialize(left) === stableSerialize(right); +} + +async function cleanupEligibleRemoteSyncChunks( + expectedManifest = null, + baseFilename = "", + options = {}, +) { + const cleanupStartedAt = readSyncTimingNow(); + const empty = (reason = "not-needed") => ({ + attempted: 0, + deleted: 0, + skipped: 0, + failed: 0, + reason, + ms: normalizeSyncTimingMs(readSyncTimingNow() - cleanupStartedAt), + }); + + if (options.disableRemoteSyncChunkCleanup === true) return empty("disabled"); + if (getAuthorityBlobAdapter(options)) return empty("authority-blob-skip"); + if (Number(expectedManifest?.formatVersion || 0) !== BME_REMOTE_SYNC_FORMAT_VERSION_V2) { + return empty("non-v2-manifest"); + } + + const pending = readRemoteSyncChunkGcPending(expectedManifest, baseFilename); + if (!pending.size) return empty("no-pending-chunks"); + + const currentResult = await readRemoteJsonFileResult(baseFilename, options); + if (!currentResult.ok) return empty(currentResult.reason || "head-read-failed"); + if (!areRemoteSyncManifestsEquivalent(currentResult.payload, expectedManifest)) { + return empty("remote-head-changed"); + } + + const nowMs = normalizeTimestamp(options.nowMs ?? options.currentTimeMs, Date.now()); + const currentChunks = collectRemoteSyncChunkFilenames(currentResult.payload, baseFilename); + const eligibleChunks = [...pending.values()] + .filter((entry) => entry.eligibleAt <= nowMs) + .filter((entry) => !currentChunks.has(entry.filename)); + + let deleted = 0; + let skipped = 0; + let failed = 0; + + for (const entry of eligibleChunks) { + try { + const result = await deleteRemoteJsonFile(entry.filename, options); + if (result.deleted) deleted += 1; + else skipped += 1; + } catch (error) { + failed += 1; + console.warn("[ST-BME] 清理旧同步 chunk 失败:", { + filename: entry.filename, + error: error instanceof Error ? error.message : String(error || ""), + }); + } + } + + return { + attempted: eligibleChunks.length, + deleted, + skipped, + failed, + ms: normalizeSyncTimingMs(readSyncTimingNow() - cleanupStartedAt), + }; +} + function chunkArray(records = [], chunkSize = 1000) { const normalizedRecords = Array.isArray(records) ? records : []; const normalizedChunkSize = Math.max(1, Math.floor(Number(chunkSize) || 1)); @@ -2550,12 +2728,21 @@ async function writeSnapshotToRemote(snapshot, chatId, options = {}) { const normalizedChatId = normalizeChatId(chatId); const normalizedSnapshot = normalizeSyncSnapshot(snapshot, normalizedChatId); const filename = await resolveSyncFilename(normalizedChatId, options); + const previousManifestReadStartedAt = readSyncTimingNow(); + const previousManifest = await readPreviousRemoteSyncManifest(filename, options); + const previousManifestReadMs = readSyncTimingNow() - previousManifestReadStartedAt; const envelopeBuildStartedAt = readSyncTimingNow(); const syncEnvelope = buildRemoteSyncEnvelopeV2( normalizedSnapshot, normalizedChatId, filename, ); + syncEnvelope.manifest.chunkGc = buildRemoteSyncChunkGcState( + previousManifest, + syncEnvelope.manifest, + filename, + options, + ); const envelopeBuildMs = readSyncTimingNow() - envelopeBuildStartedAt; let chunkSerializeMs = 0; let chunkUploadMs = 0; @@ -2573,19 +2760,27 @@ async function writeSnapshotToRemote(snapshot, chatId, options = {}) { const manifestUploadStartedAt = readSyncTimingNow(); const uploadResult = await writeRemoteJsonFile(filename, manifestPayload, options); const manifestUploadMs = readSyncTimingNow() - manifestUploadStartedAt; + const cleanupResult = await cleanupEligibleRemoteSyncChunks( + syncEnvelope.manifest, + filename, + options, + ); return { filename, path: String(uploadResult?.path || ""), backend: String(uploadResult?.backend || ""), payload: syncEnvelope.manifest, + cleanup: cleanupResult, timings: finalizeSyncTimings( { + previousManifestReadMs, envelopeBuildMs, chunkSerializeMs, chunkUploadMs, manifestSerializeMs, manifestUploadMs, + chunkCleanupMs: Number(cleanupResult?.ms || 0), responseParseMs: 0, }, writeStartedAt, @@ -3123,14 +3318,17 @@ export async function upload(chatId, options = {}) { filename: uploadResult.filename, remotePath: uploadResult.path, revision: normalizeRevision(localSnapshot.meta.revision), + cleanup: uploadResult.cleanup || null, timings: finalizeSyncTimings( { exportMs, + previousManifestReadMs: Number(uploadTimings.previousManifestReadMs || 0), envelopeBuildMs: Number(uploadTimings.envelopeBuildMs || 0), chunkSerializeMs: Number(uploadTimings.chunkSerializeMs || 0), chunkUploadMs: Number(uploadTimings.chunkUploadMs || 0), manifestSerializeMs: Number(uploadTimings.manifestSerializeMs || 0), manifestUploadMs: Number(uploadTimings.manifestUploadMs || 0), + chunkCleanupMs: Number(uploadTimings.chunkCleanupMs || 0), responseParseMs: Number(uploadTimings.responseParseMs || 0), metaPatchMs, }, diff --git a/tests/indexeddb-sync.mjs b/tests/indexeddb-sync.mjs index b373fc8..99f2aef 100644 --- a/tests/indexeddb-sync.mjs +++ b/tests/indexeddb-sync.mjs @@ -204,6 +204,43 @@ function createMockFetchEnvironment() { }; } +function createMockAuthorityBlobAdapter() { + const blobs = new Map(); + const logs = { + reads: 0, + writes: 0, + deletes: 0, + }; + return { + blobs, + logs, + adapter: { + async readJson(path) { + logs.reads += 1; + if (!blobs.has(path)) { + return { exists: false, ok: true, path }; + } + return { exists: true, ok: true, path, payload: JSON.parse(JSON.stringify(blobs.get(path))) }; + }, + async writeJson(path, payload) { + logs.writes += 1; + blobs.set(path, JSON.parse(JSON.stringify(payload))); + return { ok: true, path }; + }, + async writeText(path, payload) { + logs.writes += 1; + blobs.set(path, JSON.parse(payload)); + return { ok: true, path }; + }, + async delete(path) { + logs.deletes += 1; + const existed = blobs.delete(path); + return { ok: true, deleted: existed, path }; + }, + }, + }; +} + function buildRuntimeOptions({ dbByChatId, fetch }) { return { fetch, @@ -332,6 +369,198 @@ async function testUploadSanitizesIllegalChatIdFilename() { assert.match(logs.uploadedPayloads[0].name, /^[A-Za-z0-9._~-]+$/); } +async function testUploadDefersAndThenCleansStaleRemoteChunks() { + const { fetch, remoteFiles, logs } = createMockFetchEnvironment(); + const dbByChatId = new Map(); + const chatId = "chat-chunk-gc"; + const db = new FakeDb(chatId, { + meta: { + schemaVersion: 1, + chatId, + deviceId: "", + revision: 1, + lastModified: 100, + nodeCount: 1, + edgeCount: 1, + tombstoneCount: 0, + }, + nodes: [{ id: "n1", updatedAt: 100, name: "node" }], + edges: [{ id: "e1", fromId: "n1", toId: "n2", updatedAt: 100 }], + tombstones: [], + state: { lastProcessedFloor: 1, extractionCount: 1 }, + }); + dbByChatId.set(chatId, db); + + const runtime = buildRuntimeOptions({ dbByChatId, fetch }); + const firstUpload = await upload(chatId, { + ...runtime, + nowMs: 1_000, + remoteSyncChunkGcGraceMs: 5_000, + }); + assert.equal(firstUpload.uploaded, true); + const manifestName = firstUpload.filename; + const firstManifest = remoteFiles.get(manifestName); + const firstChunks = new Set(firstManifest.chunks.map((chunk) => chunk.filename)); + assert.ok(firstChunks.size >= 3, "v2 upload should create node, edge, and runtime-meta chunks"); + assert.equal(firstUpload.cleanup?.attempted, 0, "first upload has no previous manifest to clean"); + assert.deepEqual(firstManifest.chunkGc?.pending || [], []); + + db.snapshot = { + ...JSON.parse(JSON.stringify(db.snapshot)), + meta: { + ...db.snapshot.meta, + revision: 2, + lastModified: 200, + }, + nodes: [{ id: "n1", updatedAt: 100, name: "node" }], + edges: [{ id: "e2", fromId: "n1", toId: "n3", updatedAt: 200 }], + state: { lastProcessedFloor: 2, extractionCount: 2 }, + }; + + const secondUpload = await upload(chatId, { + ...runtime, + nowMs: 2_000, + remoteSyncChunkGcGraceMs: 5_000, + }); + assert.equal(secondUpload.uploaded, true); + const secondManifest = remoteFiles.get(manifestName); + const secondChunks = new Set(secondManifest.chunks.map((chunk) => chunk.filename)); + const staleChunks = [...firstChunks].filter((filename) => !secondChunks.has(filename)); + const sharedChunks = [...firstChunks].filter((filename) => secondChunks.has(filename)); + + assert.ok(staleChunks.length > 0, "changed edge/runtime metadata should create stale chunk files"); + assert.ok(sharedChunks.length > 0, "unchanged nodes should keep at least one shared chunk"); + for (const filename of staleChunks) { + assert.equal(remoteFiles.has(filename), true, `stale chunk remains during grace period: ${filename}`); + } + for (const filename of sharedChunks) { + assert.equal(remoteFiles.has(filename), true, `shared chunk should remain: ${filename}`); + } + for (const filename of secondChunks) { + assert.equal(remoteFiles.has(filename), true, `current chunk should remain: ${filename}`); + } + assert.deepEqual( + new Set((secondManifest.chunkGc?.pending || []).map((entry) => entry.filename)), + new Set(staleChunks), + ); + assert.equal(secondUpload.cleanup.attempted, 0); + assert.equal(secondUpload.cleanup.deleted, 0); + assert.equal(secondUpload.cleanup.failed, 0); + assert.equal(logs.deleteCalls, 0); + assert.equal(Number.isFinite(secondUpload.timings?.previousManifestReadMs), true); + assert.equal(Number.isFinite(secondUpload.timings?.chunkCleanupMs), true); + + const thirdUpload = await upload(chatId, { + ...runtime, + nowMs: 8_000, + remoteSyncChunkGcGraceMs: 5_000, + }); + assert.equal(thirdUpload.uploaded, true); + const thirdManifest = remoteFiles.get(manifestName); + for (const filename of staleChunks) { + assert.equal(remoteFiles.has(filename), false, `eligible stale chunk should be deleted: ${filename}`); + } + for (const filename of thirdManifest.chunks.map((chunk) => chunk.filename)) { + assert.equal(remoteFiles.has(filename), true, `current chunk should remain after GC: ${filename}`); + } + assert.equal(thirdUpload.cleanup.attempted, staleChunks.length); + assert.equal(thirdUpload.cleanup.deleted, staleChunks.length); + assert.equal(thirdUpload.cleanup.failed, 0); +} + +async function testUploadSkipsChunkCleanupWhenPreviousManifestUnavailable() { + const { fetch, remoteFiles, logs } = createMockFetchEnvironment(); + const dbByChatId = new Map(); + const chatId = "chat-chunk-gc-legacy"; + const db = new FakeDb(chatId, { + meta: { + schemaVersion: 1, + chatId, + deviceId: "", + revision: 3, + lastModified: 300, + nodeCount: 1, + edgeCount: 0, + tombstoneCount: 0, + }, + nodes: [{ id: "n1", updatedAt: 300 }], + edges: [], + tombstones: [], + state: { lastProcessedFloor: 3, extractionCount: 1 }, + }); + dbByChatId.set(chatId, db); + + const legacyManifestName = "ST-BME_sync_chat-chunk-gc-legacy.json"; + const unrelatedOrphanChunk = "ST-BME_sync_chat-chunk-gc-legacy.__edges.000.orphan.json"; + remoteFiles.set(legacyManifestName, { + meta: { chatId, revision: 1 }, + nodes: [], + edges: [], + tombstones: [], + state: { lastProcessedFloor: 0, extractionCount: 0 }, + }); + remoteFiles.set(unrelatedOrphanChunk, { kind: "edges", records: [{ id: "old" }] }); + + const result = await upload(chatId, buildRuntimeOptions({ dbByChatId, fetch })); + assert.equal(result.uploaded, true); + assert.equal(result.cleanup?.attempted, 0); + assert.equal(logs.deleteCalls, 0, "non-v2 previous manifest must not trigger speculative deletion"); + assert.equal(remoteFiles.has(unrelatedOrphanChunk), true, "orphan chunk cannot be deleted without manifest evidence"); +} + +async function testAuthorityBlobUploadDoesNotDeleteUserFilesFallbackChunks() { + const { fetch, remoteFiles, logs } = createMockFetchEnvironment(); + const authority = createMockAuthorityBlobAdapter(); + const dbByChatId = new Map(); + const chatId = "chat-authority-gc"; + dbByChatId.set( + chatId, + new FakeDb(chatId, { + meta: { + schemaVersion: 1, + chatId, + deviceId: "", + revision: 1, + lastModified: 100, + nodeCount: 1, + edgeCount: 0, + tombstoneCount: 0, + }, + nodes: [{ id: "n1", updatedAt: 100 }], + edges: [], + tombstones: [], + state: { lastProcessedFloor: 1, extractionCount: 1 }, + }), + ); + + const fallbackManifest = "ST-BME_sync_chat-authority-gc.json"; + const fallbackChunk = "ST-BME_sync_chat-authority-gc.__nodes.000.fallback.json"; + remoteFiles.set(fallbackManifest, { + kind: "st-bme-sync", + formatVersion: 2, + chatId, + meta: { chatId, revision: 0, lastModified: 1, nodeCount: 1, edgeCount: 0, tombstoneCount: 0, schemaVersion: 1 }, + state: { lastProcessedFloor: 0, extractionCount: 0 }, + chunks: [{ kind: "nodes", index: 0, count: 1, filename: fallbackChunk }], + }); + remoteFiles.set(fallbackChunk, { kind: "nodes", index: 0, records: [{ id: "fallback" }] }); + + const result = await upload(chatId, { + ...buildRuntimeOptions({ dbByChatId, fetch }), + authorityBlobAdapter: authority.adapter, + authorityBlobFailOpen: true, + nowMs: 10_000, + remoteSyncChunkGcGraceMs: 0, + }); + + assert.equal(result.uploaded, true); + assert.equal(result.cleanup?.reason, "authority-blob-skip"); + assert.equal(logs.deleteCalls, 0, "authority upload must not cross-delete user-files fallback chunks"); + assert.equal(authority.logs.deletes, 0, "authority upload should skip chunk GC by default"); + assert.equal(remoteFiles.has(fallbackManifest), true); + assert.equal(remoteFiles.has(fallbackChunk), true); +} + async function testDownloadImport() { const { fetch, remoteFiles } = createMockFetchEnvironment(); const dbByChatId = new Map(); @@ -1439,6 +1668,9 @@ async function main() { await testRemoteStatusMissing(); await testUploadPayloadMetaFirstAndDebounce(); await testUploadSanitizesIllegalChatIdFilename(); + await testUploadDefersAndThenCleansStaleRemoteChunks(); + await testUploadSkipsChunkCleanupWhenPreviousManifestUnavailable(); + await testAuthorityBlobUploadDoesNotDeleteUserFilesFallbackChunks(); await testDownloadImport(); await testLegacyRemoteFilenameFallbackAndReuse(); await testMergeRules();