fix(sync): defer cleanup of stale remote chunks

This commit is contained in:
youzini
2026-06-06 09:36:41 +00:00
parent 017e801767
commit dba53cc21c
2 changed files with 430 additions and 0 deletions

View File

@@ -12,6 +12,8 @@ const BME_REMOTE_SYNC_FORMAT_VERSION_V2 = 2;
const BME_REMOTE_SYNC_NODE_CHUNK_SIZE = 2000;
const BME_REMOTE_SYNC_EDGE_CHUNK_SIZE = 4000;
const BME_REMOTE_SYNC_TOMBSTONE_CHUNK_SIZE = 2000;
const BME_REMOTE_SYNC_CHUNK_GC_GRACE_MS = 24 * 60 * 60 * 1000;
const BME_REMOTE_SYNC_CHUNK_GC_MAX_PENDING = 512;
const BME_BACKUP_FILE_PREFIX = "ST-BME_backup_";
const BME_BACKUP_MANIFEST_FILENAME = "ST-BME_BackupManifest.json";
const BME_BACKUP_SCHEMA_VERSION = 1;
@@ -1318,6 +1320,182 @@ function buildRemoteChunkFilename(baseFilename, kind, index, payload) {
return `${normalizedBase}.__${normalizedKind}.${String(index).padStart(3, "0")}.${hash}.json`;
}
function isRemoteSyncChunkFilenameForBase(filename = "", baseFilename = "") {
const normalizedFilename = normalizeRemoteFileName(filename);
const normalizedBase = normalizeRemoteFileName(baseFilename).replace(/\.json$/i, "");
if (!normalizedFilename || !normalizedBase) return false;
const escapedBase = normalizedBase.replace(/[.*+?^${}()|[\]\\]/g, "\\$&");
return new RegExp(
`^${escapedBase}\\.__(nodes|edges|tombstones|runtime-meta)\\.\\d{3}\\.[A-Za-z0-9]+\\.json$`,
).test(normalizedFilename);
}
function collectRemoteSyncChunkFilenames(manifest = {}, baseFilename = "") {
if (Number(manifest?.formatVersion || 0) !== BME_REMOTE_SYNC_FORMAT_VERSION_V2) {
return new Set();
}
const filenames = new Set();
for (const chunk of Array.isArray(manifest?.chunks) ? manifest.chunks : []) {
const filename = resolveRemoteFileName(chunk?.filename || "");
if (!isRemoteSyncChunkFilenameForBase(filename, baseFilename)) continue;
filenames.add(filename);
}
return filenames;
}
async function readPreviousRemoteSyncManifest(filename = "", options = {}) {
const result = await readRemoteJsonFileResult(filename, options);
if (result.status === 404) return null;
if (!result.ok) {
console.warn("[ST-BME] 读取旧同步 manifest 失败,跳过旧 chunk 清理:", result.reason || result.error || result.status);
return null;
}
if (Number(result.payload?.formatVersion || 0) !== BME_REMOTE_SYNC_FORMAT_VERSION_V2) {
return null;
}
return result.payload;
}
function normalizeRemoteSyncChunkGcPendingEntry(entry = {}, baseFilename = "") {
const filename = resolveRemoteFileName(entry?.filename || "");
if (!isRemoteSyncChunkFilenameForBase(filename, baseFilename)) return null;
const firstSeenAt = normalizeTimestamp(entry?.firstSeenAt, Date.now());
const eligibleAt = normalizeTimestamp(entry?.eligibleAt, firstSeenAt + BME_REMOTE_SYNC_CHUNK_GC_GRACE_MS);
return {
filename,
firstSeenAt,
eligibleAt,
sourceRevision: normalizeRevision(entry?.sourceRevision),
};
}
function readRemoteSyncChunkGcPending(manifest = {}, baseFilename = "") {
const rawPending = Array.isArray(manifest?.chunkGc?.pending)
? manifest.chunkGc.pending
: [];
const pendingByFilename = new Map();
for (const entry of rawPending) {
const normalized = normalizeRemoteSyncChunkGcPendingEntry(entry, baseFilename);
if (!normalized) continue;
const existing = pendingByFilename.get(normalized.filename);
if (!existing || normalized.firstSeenAt < existing.firstSeenAt) {
pendingByFilename.set(normalized.filename, normalized);
}
}
return pendingByFilename;
}
function buildRemoteSyncChunkGcState(
previousManifest = null,
nextManifest = null,
baseFilename = "",
options = {},
) {
if (Number(nextManifest?.formatVersion || 0) !== BME_REMOTE_SYNC_FORMAT_VERSION_V2) return null;
const nowMs = normalizeTimestamp(options.nowMs ?? options.currentTimeMs, Date.now());
const graceMs = Math.max(
0,
Math.floor(Number(options.remoteSyncChunkGcGraceMs ?? BME_REMOTE_SYNC_CHUNK_GC_GRACE_MS) || 0),
);
const nextChunks = collectRemoteSyncChunkFilenames(nextManifest, baseFilename);
const pendingByFilename = readRemoteSyncChunkGcPending(previousManifest, baseFilename);
for (const filename of nextChunks) {
pendingByFilename.delete(filename);
}
const previousChunks = collectRemoteSyncChunkFilenames(previousManifest, baseFilename);
const previousRevision = normalizeRevision(previousManifest?.meta?.revision);
for (const filename of previousChunks) {
if (nextChunks.has(filename) || pendingByFilename.has(filename)) continue;
pendingByFilename.set(filename, {
filename,
firstSeenAt: nowMs,
eligibleAt: nowMs + graceMs,
sourceRevision: previousRevision,
});
}
const pending = [...pendingByFilename.values()]
.filter((entry) => !nextChunks.has(entry.filename))
.sort((left, right) => left.eligibleAt - right.eligibleAt || left.filename.localeCompare(right.filename))
.slice(0, BME_REMOTE_SYNC_CHUNK_GC_MAX_PENDING);
return {
version: 1,
updatedAt: nowMs,
graceMs,
pending,
};
}
function areRemoteSyncManifestsEquivalent(left = {}, right = {}) {
return stableSerialize(left) === stableSerialize(right);
}
async function cleanupEligibleRemoteSyncChunks(
expectedManifest = null,
baseFilename = "",
options = {},
) {
const cleanupStartedAt = readSyncTimingNow();
const empty = (reason = "not-needed") => ({
attempted: 0,
deleted: 0,
skipped: 0,
failed: 0,
reason,
ms: normalizeSyncTimingMs(readSyncTimingNow() - cleanupStartedAt),
});
if (options.disableRemoteSyncChunkCleanup === true) return empty("disabled");
if (getAuthorityBlobAdapter(options)) return empty("authority-blob-skip");
if (Number(expectedManifest?.formatVersion || 0) !== BME_REMOTE_SYNC_FORMAT_VERSION_V2) {
return empty("non-v2-manifest");
}
const pending = readRemoteSyncChunkGcPending(expectedManifest, baseFilename);
if (!pending.size) return empty("no-pending-chunks");
const currentResult = await readRemoteJsonFileResult(baseFilename, options);
if (!currentResult.ok) return empty(currentResult.reason || "head-read-failed");
if (!areRemoteSyncManifestsEquivalent(currentResult.payload, expectedManifest)) {
return empty("remote-head-changed");
}
const nowMs = normalizeTimestamp(options.nowMs ?? options.currentTimeMs, Date.now());
const currentChunks = collectRemoteSyncChunkFilenames(currentResult.payload, baseFilename);
const eligibleChunks = [...pending.values()]
.filter((entry) => entry.eligibleAt <= nowMs)
.filter((entry) => !currentChunks.has(entry.filename));
let deleted = 0;
let skipped = 0;
let failed = 0;
for (const entry of eligibleChunks) {
try {
const result = await deleteRemoteJsonFile(entry.filename, options);
if (result.deleted) deleted += 1;
else skipped += 1;
} catch (error) {
failed += 1;
console.warn("[ST-BME] 清理旧同步 chunk 失败:", {
filename: entry.filename,
error: error instanceof Error ? error.message : String(error || ""),
});
}
}
return {
attempted: eligibleChunks.length,
deleted,
skipped,
failed,
ms: normalizeSyncTimingMs(readSyncTimingNow() - cleanupStartedAt),
};
}
function chunkArray(records = [], chunkSize = 1000) {
const normalizedRecords = Array.isArray(records) ? records : [];
const normalizedChunkSize = Math.max(1, Math.floor(Number(chunkSize) || 1));
@@ -2550,12 +2728,21 @@ async function writeSnapshotToRemote(snapshot, chatId, options = {}) {
const normalizedChatId = normalizeChatId(chatId);
const normalizedSnapshot = normalizeSyncSnapshot(snapshot, normalizedChatId);
const filename = await resolveSyncFilename(normalizedChatId, options);
const previousManifestReadStartedAt = readSyncTimingNow();
const previousManifest = await readPreviousRemoteSyncManifest(filename, options);
const previousManifestReadMs = readSyncTimingNow() - previousManifestReadStartedAt;
const envelopeBuildStartedAt = readSyncTimingNow();
const syncEnvelope = buildRemoteSyncEnvelopeV2(
normalizedSnapshot,
normalizedChatId,
filename,
);
syncEnvelope.manifest.chunkGc = buildRemoteSyncChunkGcState(
previousManifest,
syncEnvelope.manifest,
filename,
options,
);
const envelopeBuildMs = readSyncTimingNow() - envelopeBuildStartedAt;
let chunkSerializeMs = 0;
let chunkUploadMs = 0;
@@ -2573,19 +2760,27 @@ async function writeSnapshotToRemote(snapshot, chatId, options = {}) {
const manifestUploadStartedAt = readSyncTimingNow();
const uploadResult = await writeRemoteJsonFile(filename, manifestPayload, options);
const manifestUploadMs = readSyncTimingNow() - manifestUploadStartedAt;
const cleanupResult = await cleanupEligibleRemoteSyncChunks(
syncEnvelope.manifest,
filename,
options,
);
return {
filename,
path: String(uploadResult?.path || ""),
backend: String(uploadResult?.backend || ""),
payload: syncEnvelope.manifest,
cleanup: cleanupResult,
timings: finalizeSyncTimings(
{
previousManifestReadMs,
envelopeBuildMs,
chunkSerializeMs,
chunkUploadMs,
manifestSerializeMs,
manifestUploadMs,
chunkCleanupMs: Number(cleanupResult?.ms || 0),
responseParseMs: 0,
},
writeStartedAt,
@@ -3123,14 +3318,17 @@ export async function upload(chatId, options = {}) {
filename: uploadResult.filename,
remotePath: uploadResult.path,
revision: normalizeRevision(localSnapshot.meta.revision),
cleanup: uploadResult.cleanup || null,
timings: finalizeSyncTimings(
{
exportMs,
previousManifestReadMs: Number(uploadTimings.previousManifestReadMs || 0),
envelopeBuildMs: Number(uploadTimings.envelopeBuildMs || 0),
chunkSerializeMs: Number(uploadTimings.chunkSerializeMs || 0),
chunkUploadMs: Number(uploadTimings.chunkUploadMs || 0),
manifestSerializeMs: Number(uploadTimings.manifestSerializeMs || 0),
manifestUploadMs: Number(uploadTimings.manifestUploadMs || 0),
chunkCleanupMs: Number(uploadTimings.chunkCleanupMs || 0),
responseParseMs: Number(uploadTimings.responseParseMs || 0),
metaPatchMs,
},

View File

@@ -204,6 +204,43 @@ function createMockFetchEnvironment() {
};
}
function createMockAuthorityBlobAdapter() {
const blobs = new Map();
const logs = {
reads: 0,
writes: 0,
deletes: 0,
};
return {
blobs,
logs,
adapter: {
async readJson(path) {
logs.reads += 1;
if (!blobs.has(path)) {
return { exists: false, ok: true, path };
}
return { exists: true, ok: true, path, payload: JSON.parse(JSON.stringify(blobs.get(path))) };
},
async writeJson(path, payload) {
logs.writes += 1;
blobs.set(path, JSON.parse(JSON.stringify(payload)));
return { ok: true, path };
},
async writeText(path, payload) {
logs.writes += 1;
blobs.set(path, JSON.parse(payload));
return { ok: true, path };
},
async delete(path) {
logs.deletes += 1;
const existed = blobs.delete(path);
return { ok: true, deleted: existed, path };
},
},
};
}
function buildRuntimeOptions({ dbByChatId, fetch }) {
return {
fetch,
@@ -332,6 +369,198 @@ async function testUploadSanitizesIllegalChatIdFilename() {
assert.match(logs.uploadedPayloads[0].name, /^[A-Za-z0-9._~-]+$/);
}
async function testUploadDefersAndThenCleansStaleRemoteChunks() {
const { fetch, remoteFiles, logs } = createMockFetchEnvironment();
const dbByChatId = new Map();
const chatId = "chat-chunk-gc";
const db = new FakeDb(chatId, {
meta: {
schemaVersion: 1,
chatId,
deviceId: "",
revision: 1,
lastModified: 100,
nodeCount: 1,
edgeCount: 1,
tombstoneCount: 0,
},
nodes: [{ id: "n1", updatedAt: 100, name: "node" }],
edges: [{ id: "e1", fromId: "n1", toId: "n2", updatedAt: 100 }],
tombstones: [],
state: { lastProcessedFloor: 1, extractionCount: 1 },
});
dbByChatId.set(chatId, db);
const runtime = buildRuntimeOptions({ dbByChatId, fetch });
const firstUpload = await upload(chatId, {
...runtime,
nowMs: 1_000,
remoteSyncChunkGcGraceMs: 5_000,
});
assert.equal(firstUpload.uploaded, true);
const manifestName = firstUpload.filename;
const firstManifest = remoteFiles.get(manifestName);
const firstChunks = new Set(firstManifest.chunks.map((chunk) => chunk.filename));
assert.ok(firstChunks.size >= 3, "v2 upload should create node, edge, and runtime-meta chunks");
assert.equal(firstUpload.cleanup?.attempted, 0, "first upload has no previous manifest to clean");
assert.deepEqual(firstManifest.chunkGc?.pending || [], []);
db.snapshot = {
...JSON.parse(JSON.stringify(db.snapshot)),
meta: {
...db.snapshot.meta,
revision: 2,
lastModified: 200,
},
nodes: [{ id: "n1", updatedAt: 100, name: "node" }],
edges: [{ id: "e2", fromId: "n1", toId: "n3", updatedAt: 200 }],
state: { lastProcessedFloor: 2, extractionCount: 2 },
};
const secondUpload = await upload(chatId, {
...runtime,
nowMs: 2_000,
remoteSyncChunkGcGraceMs: 5_000,
});
assert.equal(secondUpload.uploaded, true);
const secondManifest = remoteFiles.get(manifestName);
const secondChunks = new Set(secondManifest.chunks.map((chunk) => chunk.filename));
const staleChunks = [...firstChunks].filter((filename) => !secondChunks.has(filename));
const sharedChunks = [...firstChunks].filter((filename) => secondChunks.has(filename));
assert.ok(staleChunks.length > 0, "changed edge/runtime metadata should create stale chunk files");
assert.ok(sharedChunks.length > 0, "unchanged nodes should keep at least one shared chunk");
for (const filename of staleChunks) {
assert.equal(remoteFiles.has(filename), true, `stale chunk remains during grace period: ${filename}`);
}
for (const filename of sharedChunks) {
assert.equal(remoteFiles.has(filename), true, `shared chunk should remain: ${filename}`);
}
for (const filename of secondChunks) {
assert.equal(remoteFiles.has(filename), true, `current chunk should remain: ${filename}`);
}
assert.deepEqual(
new Set((secondManifest.chunkGc?.pending || []).map((entry) => entry.filename)),
new Set(staleChunks),
);
assert.equal(secondUpload.cleanup.attempted, 0);
assert.equal(secondUpload.cleanup.deleted, 0);
assert.equal(secondUpload.cleanup.failed, 0);
assert.equal(logs.deleteCalls, 0);
assert.equal(Number.isFinite(secondUpload.timings?.previousManifestReadMs), true);
assert.equal(Number.isFinite(secondUpload.timings?.chunkCleanupMs), true);
const thirdUpload = await upload(chatId, {
...runtime,
nowMs: 8_000,
remoteSyncChunkGcGraceMs: 5_000,
});
assert.equal(thirdUpload.uploaded, true);
const thirdManifest = remoteFiles.get(manifestName);
for (const filename of staleChunks) {
assert.equal(remoteFiles.has(filename), false, `eligible stale chunk should be deleted: ${filename}`);
}
for (const filename of thirdManifest.chunks.map((chunk) => chunk.filename)) {
assert.equal(remoteFiles.has(filename), true, `current chunk should remain after GC: ${filename}`);
}
assert.equal(thirdUpload.cleanup.attempted, staleChunks.length);
assert.equal(thirdUpload.cleanup.deleted, staleChunks.length);
assert.equal(thirdUpload.cleanup.failed, 0);
}
async function testUploadSkipsChunkCleanupWhenPreviousManifestUnavailable() {
const { fetch, remoteFiles, logs } = createMockFetchEnvironment();
const dbByChatId = new Map();
const chatId = "chat-chunk-gc-legacy";
const db = new FakeDb(chatId, {
meta: {
schemaVersion: 1,
chatId,
deviceId: "",
revision: 3,
lastModified: 300,
nodeCount: 1,
edgeCount: 0,
tombstoneCount: 0,
},
nodes: [{ id: "n1", updatedAt: 300 }],
edges: [],
tombstones: [],
state: { lastProcessedFloor: 3, extractionCount: 1 },
});
dbByChatId.set(chatId, db);
const legacyManifestName = "ST-BME_sync_chat-chunk-gc-legacy.json";
const unrelatedOrphanChunk = "ST-BME_sync_chat-chunk-gc-legacy.__edges.000.orphan.json";
remoteFiles.set(legacyManifestName, {
meta: { chatId, revision: 1 },
nodes: [],
edges: [],
tombstones: [],
state: { lastProcessedFloor: 0, extractionCount: 0 },
});
remoteFiles.set(unrelatedOrphanChunk, { kind: "edges", records: [{ id: "old" }] });
const result = await upload(chatId, buildRuntimeOptions({ dbByChatId, fetch }));
assert.equal(result.uploaded, true);
assert.equal(result.cleanup?.attempted, 0);
assert.equal(logs.deleteCalls, 0, "non-v2 previous manifest must not trigger speculative deletion");
assert.equal(remoteFiles.has(unrelatedOrphanChunk), true, "orphan chunk cannot be deleted without manifest evidence");
}
async function testAuthorityBlobUploadDoesNotDeleteUserFilesFallbackChunks() {
const { fetch, remoteFiles, logs } = createMockFetchEnvironment();
const authority = createMockAuthorityBlobAdapter();
const dbByChatId = new Map();
const chatId = "chat-authority-gc";
dbByChatId.set(
chatId,
new FakeDb(chatId, {
meta: {
schemaVersion: 1,
chatId,
deviceId: "",
revision: 1,
lastModified: 100,
nodeCount: 1,
edgeCount: 0,
tombstoneCount: 0,
},
nodes: [{ id: "n1", updatedAt: 100 }],
edges: [],
tombstones: [],
state: { lastProcessedFloor: 1, extractionCount: 1 },
}),
);
const fallbackManifest = "ST-BME_sync_chat-authority-gc.json";
const fallbackChunk = "ST-BME_sync_chat-authority-gc.__nodes.000.fallback.json";
remoteFiles.set(fallbackManifest, {
kind: "st-bme-sync",
formatVersion: 2,
chatId,
meta: { chatId, revision: 0, lastModified: 1, nodeCount: 1, edgeCount: 0, tombstoneCount: 0, schemaVersion: 1 },
state: { lastProcessedFloor: 0, extractionCount: 0 },
chunks: [{ kind: "nodes", index: 0, count: 1, filename: fallbackChunk }],
});
remoteFiles.set(fallbackChunk, { kind: "nodes", index: 0, records: [{ id: "fallback" }] });
const result = await upload(chatId, {
...buildRuntimeOptions({ dbByChatId, fetch }),
authorityBlobAdapter: authority.adapter,
authorityBlobFailOpen: true,
nowMs: 10_000,
remoteSyncChunkGcGraceMs: 0,
});
assert.equal(result.uploaded, true);
assert.equal(result.cleanup?.reason, "authority-blob-skip");
assert.equal(logs.deleteCalls, 0, "authority upload must not cross-delete user-files fallback chunks");
assert.equal(authority.logs.deletes, 0, "authority upload should skip chunk GC by default");
assert.equal(remoteFiles.has(fallbackManifest), true);
assert.equal(remoteFiles.has(fallbackChunk), true);
}
async function testDownloadImport() {
const { fetch, remoteFiles } = createMockFetchEnvironment();
const dbByChatId = new Map();
@@ -1439,6 +1668,9 @@ async function main() {
await testRemoteStatusMissing();
await testUploadPayloadMetaFirstAndDebounce();
await testUploadSanitizesIllegalChatIdFilename();
await testUploadDefersAndThenCleansStaleRemoteChunks();
await testUploadSkipsChunkCleanupWhenPreviousManifestUnavailable();
await testAuthorityBlobUploadDoesNotDeleteUserFilesFallbackChunks();
await testDownloadImport();
await testLegacyRemoteFilenameFallbackAndReuse();
await testMergeRules();