diff --git a/sync/authority-graph-store.js b/sync/authority-graph-store.js index 1eb0216..b5c99df 100644 --- a/sync/authority-graph-store.js +++ b/sync/authority-graph-store.js @@ -18,6 +18,9 @@ const DEFAULT_AUTHORITY_SQL_DATABASE = "default"; const AUTHORITY_SQL_QUERY_ENDPOINT = "/sql/query"; const AUTHORITY_SQL_EXEC_ENDPOINT = "/sql/exec"; const AUTHORITY_SQL_TRANSACTION_ENDPOINT = "/sql/transaction"; +const AUTHORITY_SQL_TRANSACTION_BATCH_SIZE = 150; +const AUTHORITY_SQL_TRANSACTION_MAX_REQUEST_BYTES = 1024 * 1024; +const AUTHORITY_SQL_TRANSACTION_SAFE_REQUEST_BYTES = 512 * 1024; const AUTHORITY_TABLES = Object.freeze({ meta: "st_bme_graph_meta", @@ -85,6 +88,108 @@ function estimatePersistPayloadBytes(value = null) { } } +function measureJsonBytes(value = null) { + let json = ""; + try { + json = JSON.stringify(value ?? null); + } catch { + json = ""; + } + if (typeof TextEncoder === "function") { + return new TextEncoder().encode(json).byteLength; + } + return json.length * 3; +} + +function normalizeSqlTransactionByteBudget(value) { + const parsed = Number(value); + if (!Number.isFinite(parsed) || parsed <= 0) return AUTHORITY_SQL_TRANSACTION_SAFE_REQUEST_BYTES; + return Math.max(1024, Math.floor(parsed)); +} + +function isAuthorityPayloadTooLargeError(error = null) { + const status = Number(error?.status || error?.payload?.status || 0); + const category = String(error?.category || error?.payload?.category || "").toLowerCase(); + const code = String(error?.code || error?.payload?.code || error?.payload?.reason || "").toLowerCase(); + const message = String(error?.message || error?.payload?.error || error?.payload?.message || "").toLowerCase(); + return ( + status === 413 || + category === "payload-too-large" || + category === "limit" || + code.includes("length_limit") || + code.includes("payload") || + message.includes("length limit") || + message.includes("payload too large") || + message.includes("request entity too large") + ); +} + +function createTerminalAuthoritySqlPayloadError(statement = null, cause = null) { + const estimatedBytes = estimateAuthoritySqlTransactionRequestBytes([statement], DEFAULT_AUTHORITY_SQL_DATABASE); + const error = new Error( + `Authority SQL transaction payload is too large for a single graph persistence statement (${estimatedBytes} bytes estimated); refusing endless retry`, + ); + error.name = "AuthoritySqlPayloadTooLargeError"; + error.status = Number(cause?.status || 413); + error.code = "authority_sql_payload_too_large"; + error.category = "payload-too-large"; + error.terminal = true; + error.nonRetryable = true; + error.estimatedBytes = estimatedBytes; + error.cause = cause; + return error; +} + +function normalizeAuthorityTransactionStatement(statement = {}) { + const positional = convertNamedParamsToPositional(String(statement?.sql || ""), statement?.params || {}); + return { + statement: positional.sql, + params: positional.params, + }; +} + +function estimateAuthoritySqlTransactionRequestBytes(statements = [], database = DEFAULT_AUTHORITY_SQL_DATABASE) { + return measureJsonBytes({ + database: normalizeRecordId(database) || DEFAULT_AUTHORITY_SQL_DATABASE, + statements: toArray(statements) + .filter((statement) => statement?.sql) + .map(normalizeAuthorityTransactionStatement), + }); +} + +function buildAuthoritySqlTransactionChunks(statements = [], options = {}) { + const normalizedStatements = toArray(statements).filter((statement) => statement?.sql); + const maxStatements = Math.max(1, Math.floor(Number(options.maxStatements) || AUTHORITY_SQL_TRANSACTION_BATCH_SIZE)); + const maxBytes = normalizeSqlTransactionByteBudget(options.maxBytes); + const database = normalizeRecordId(options.database) || DEFAULT_AUTHORITY_SQL_DATABASE; + const chunks = []; + let current = []; + + const pushCurrent = () => { + if (current.length) { + chunks.push(current); + current = []; + } + }; + + for (const statement of normalizedStatements) { + if (!current.length) { + current.push(statement); + continue; + } + + const candidate = [...current, statement]; + const candidateBytes = estimateAuthoritySqlTransactionRequestBytes(candidate, database); + if (candidate.length > maxStatements || candidateBytes > maxBytes) { + pushCurrent(); + } + current.push(statement); + } + + pushCurrent(); + return chunks; +} + function toPlainData(value, fallbackValue = null) { if (value == null) return fallbackValue; if (typeof globalThis.structuredClone === "function") { @@ -390,13 +495,7 @@ export class AuthoritySqlHttpClient { database: this.database, statements: toArray(statements) .filter((statement) => statement?.sql) - .map((statement) => { - const positional = convertNamedParamsToPositional(String(statement.sql || ""), statement.params || {}); - return { - statement: positional.sql, - params: positional.params, - }; - }), + .map(normalizeAuthorityTransactionStatement), }); } @@ -1039,15 +1138,11 @@ export class AuthorityGraphStore { const normalizedStatements = toArray(statements).filter((statement) => statement?.sql); if (!normalizedStatements.length) return null; - const BATCH_SIZE = 150; if (typeof this.sqlClient?.transaction === "function") { - if (normalizedStatements.length <= BATCH_SIZE) { - return await this.sqlClient.transaction(normalizedStatements); - } let lastResult = null; - for (let i = 0; i < normalizedStatements.length; i += BATCH_SIZE) { - const batch = normalizedStatements.slice(i, i + BATCH_SIZE); - lastResult = await this.sqlClient.transaction(batch); + const chunks = this._buildTransactionChunks(normalizedStatements); + for (const batch of chunks) { + lastResult = await this._executeTransactionChunk(batch); } return lastResult; } @@ -1078,6 +1173,42 @@ export class AuthorityGraphStore { return result; } + _buildTransactionChunks(statements = []) { + const budget = normalizeSqlTransactionByteBudget( + this.options?.sqlTransactionMaxBytes ?? this.options?.authoritySqlTransactionMaxBytes, + ); + const maxBytes = Math.min( + AUTHORITY_SQL_TRANSACTION_MAX_REQUEST_BYTES - 64 * 1024, + Math.max(1024, budget), + ); + return buildAuthoritySqlTransactionChunks(statements, { + database: this.sqlClient?.database || DEFAULT_AUTHORITY_SQL_DATABASE, + maxStatements: this.options?.sqlTransactionBatchSize ?? AUTHORITY_SQL_TRANSACTION_BATCH_SIZE, + maxBytes, + }); + } + + async _executeTransactionChunk(batch = []) { + const normalizedBatch = toArray(batch).filter((statement) => statement?.sql); + if (!normalizedBatch.length) return null; + try { + return await this.sqlClient.transaction(normalizedBatch); + } catch (error) { + if (!isAuthorityPayloadTooLargeError(error)) { + throw error; + } + if (normalizedBatch.length <= 1) { + throw createTerminalAuthoritySqlPayloadError(normalizedBatch[0], error); + } + const mid = Math.max(1, Math.floor(normalizedBatch.length / 2)); + const left = normalizedBatch.slice(0, mid); + const right = normalizedBatch.slice(mid); + const leftResult = await this._executeTransactionChunk(left); + const rightResult = await this._executeTransactionChunk(right); + return rightResult ?? leftResult; + } + } + _upsertMetaStatement(key, value, nowMs = Date.now()) { return { sql: `INSERT INTO ${AUTHORITY_TABLES.meta} (chat_id, meta_key, value_json, updated_at) VALUES (:chatId, :key, :valueJson, :updatedAt) ON CONFLICT(chat_id, meta_key) DO UPDATE SET value_json = excluded.value_json, updated_at = excluded.updated_at`, diff --git a/tests/authority-graph-store.mjs b/tests/authority-graph-store.mjs index fdad2af..63278cc 100644 --- a/tests/authority-graph-store.mjs +++ b/tests/authority-graph-store.mjs @@ -168,6 +168,18 @@ class MockAuthoritySqlClient { } } +function createLimitError() { + const error = new Error("length limit exceeded"); + error.status = 413; + error.category = "limit"; + error.code = "length_limit_exceeded"; + return error; +} + +function isNodeUpsertStatement(statement = {}) { + return String(statement.sql || "").toLowerCase().includes("insert into st_bme_graph_nodes"); +} + async function testOpenSeedsAuthorityMeta() { const sqlClient = new MockAuthoritySqlClient(); const store = new AuthorityGraphStore("authority-chat-a", { sqlClient }); @@ -390,10 +402,104 @@ async function testConvertNamedParamsToPositional() { assert.deepEqual(r8.params, [5]); } +async function testTransactionBatchingUsesByteBudget() { + const sqlClient = new MockAuthoritySqlClient(); + const batchSizes = []; + const originalTransaction = sqlClient.transaction.bind(sqlClient); + sqlClient.transaction = async (statements = []) => { + if (statements.some(isNodeUpsertStatement)) { + batchSizes.push(statements.length); + } + return await originalTransaction(statements); + }; + const store = new AuthorityGraphStore("authority-chat-byte-budget", { + sqlClient, + sqlTransactionBatchSize: 150, + sqlTransactionMaxBytes: 4096, + }); + const nodes = Array.from({ length: 8 }, (_, index) => ({ + id: `node-${index}`, + type: "event", + text: `payload-${index}-${"測試".repeat(260)}`, + updatedAt: index + 1, + })); + + await store.bulkUpsertNodes(nodes); + + assert.equal((await store.listNodes()).length, nodes.length); + assert.ok(batchSizes.length > 1, "expected large payload to split into multiple transactions"); + assert.ok(batchSizes.every((size) => size < nodes.length), "expected no single node transaction batch to contain all records"); +} + +async function testTransaction413SplitsWithoutRepeatingOversizedBatch() { + const sqlClient = new MockAuthoritySqlClient(); + const attemptedBatchSizes = []; + const originalTransaction = sqlClient.transaction.bind(sqlClient); + sqlClient.transaction = async (statements = []) => { + const nodeBatchSize = statements.filter(isNodeUpsertStatement).length; + if (nodeBatchSize > 0) { + attemptedBatchSizes.push(nodeBatchSize); + if (nodeBatchSize > 2) { + throw createLimitError(); + } + } + return await originalTransaction(statements); + }; + const store = new AuthorityGraphStore("authority-chat-413-split", { + sqlClient, + sqlTransactionBatchSize: 150, + sqlTransactionMaxBytes: 512 * 1024, + }); + const nodes = Array.from({ length: 6 }, (_, index) => ({ + id: `node-${index}`, + type: "event", + text: `payload-${index}`, + updatedAt: index + 1, + })); + + await store.bulkUpsertNodes(nodes); + + assert.equal((await store.listNodes()).length, nodes.length); + assert.equal(attemptedBatchSizes[0], 6); + assert.ok(attemptedBatchSizes.some((size) => size <= 2), "expected oversized request to be split below the failing size"); + assert.ok(attemptedBatchSizes.length <= 10, "expected bounded split attempts instead of an endless retry loop"); +} + +async function testSingleStatement413IsTerminal() { + const sqlClient = new MockAuthoritySqlClient(); + let oversizedAttempts = 0; + sqlClient.transaction = async (statements = []) => { + if (statements.some(isNodeUpsertStatement)) { + oversizedAttempts += 1; + throw createLimitError(); + } + for (const statement of statements) { + await sqlClient.execute(statement.sql, statement.params || {}); + } + return { executed: statements.length }; + }; + const store = new AuthorityGraphStore("authority-chat-single-413", { sqlClient }); + + await assert.rejects( + () => store.bulkUpsertNodes([{ id: "too-large", type: "event", text: "x".repeat(1024), updatedAt: 1 }]), + (error) => { + assert.equal(error.name, "AuthoritySqlPayloadTooLargeError"); + assert.equal(error.nonRetryable, true); + assert.equal(error.terminal, true); + return true; + }, + ); + assert.ok(oversizedAttempts >= 1, "expected the oversized record to be attempted at least once"); + assert.ok(oversizedAttempts <= 4, "single oversized record must not retry forever"); +} + await testConvertNamedParamsToPositional(); await testOpenSeedsAuthorityMeta(); await testImportCommitAndExportSnapshot(); await testPruneAndClear(); await testHttpSqlClientBoundary(); +await testTransactionBatchingUsesByteBudget(); +await testTransaction413SplitsWithoutRepeatingOversizedBatch(); +await testSingleStatement413IsTerminal(); console.log(`${PREFIX} all tests passed`);