fix(authority): split graph SQL persistence by payload size

This commit is contained in:
youzini
2026-05-06 15:37:11 +00:00
parent 60227ac4a8
commit db5a341617
2 changed files with 251 additions and 14 deletions

View File

@@ -18,6 +18,9 @@ const DEFAULT_AUTHORITY_SQL_DATABASE = "default";
const AUTHORITY_SQL_QUERY_ENDPOINT = "/sql/query";
const AUTHORITY_SQL_EXEC_ENDPOINT = "/sql/exec";
const AUTHORITY_SQL_TRANSACTION_ENDPOINT = "/sql/transaction";
const AUTHORITY_SQL_TRANSACTION_BATCH_SIZE = 150;
const AUTHORITY_SQL_TRANSACTION_MAX_REQUEST_BYTES = 1024 * 1024;
const AUTHORITY_SQL_TRANSACTION_SAFE_REQUEST_BYTES = 512 * 1024;
const AUTHORITY_TABLES = Object.freeze({
meta: "st_bme_graph_meta",
@@ -85,6 +88,108 @@ function estimatePersistPayloadBytes(value = null) {
}
}
function measureJsonBytes(value = null) {
let json = "";
try {
json = JSON.stringify(value ?? null);
} catch {
json = "";
}
if (typeof TextEncoder === "function") {
return new TextEncoder().encode(json).byteLength;
}
return json.length * 3;
}
function normalizeSqlTransactionByteBudget(value) {
const parsed = Number(value);
if (!Number.isFinite(parsed) || parsed <= 0) return AUTHORITY_SQL_TRANSACTION_SAFE_REQUEST_BYTES;
return Math.max(1024, Math.floor(parsed));
}
function isAuthorityPayloadTooLargeError(error = null) {
const status = Number(error?.status || error?.payload?.status || 0);
const category = String(error?.category || error?.payload?.category || "").toLowerCase();
const code = String(error?.code || error?.payload?.code || error?.payload?.reason || "").toLowerCase();
const message = String(error?.message || error?.payload?.error || error?.payload?.message || "").toLowerCase();
return (
status === 413 ||
category === "payload-too-large" ||
category === "limit" ||
code.includes("length_limit") ||
code.includes("payload") ||
message.includes("length limit") ||
message.includes("payload too large") ||
message.includes("request entity too large")
);
}
function createTerminalAuthoritySqlPayloadError(statement = null, cause = null) {
const estimatedBytes = estimateAuthoritySqlTransactionRequestBytes([statement], DEFAULT_AUTHORITY_SQL_DATABASE);
const error = new Error(
`Authority SQL transaction payload is too large for a single graph persistence statement (${estimatedBytes} bytes estimated); refusing endless retry`,
);
error.name = "AuthoritySqlPayloadTooLargeError";
error.status = Number(cause?.status || 413);
error.code = "authority_sql_payload_too_large";
error.category = "payload-too-large";
error.terminal = true;
error.nonRetryable = true;
error.estimatedBytes = estimatedBytes;
error.cause = cause;
return error;
}
function normalizeAuthorityTransactionStatement(statement = {}) {
const positional = convertNamedParamsToPositional(String(statement?.sql || ""), statement?.params || {});
return {
statement: positional.sql,
params: positional.params,
};
}
function estimateAuthoritySqlTransactionRequestBytes(statements = [], database = DEFAULT_AUTHORITY_SQL_DATABASE) {
return measureJsonBytes({
database: normalizeRecordId(database) || DEFAULT_AUTHORITY_SQL_DATABASE,
statements: toArray(statements)
.filter((statement) => statement?.sql)
.map(normalizeAuthorityTransactionStatement),
});
}
function buildAuthoritySqlTransactionChunks(statements = [], options = {}) {
const normalizedStatements = toArray(statements).filter((statement) => statement?.sql);
const maxStatements = Math.max(1, Math.floor(Number(options.maxStatements) || AUTHORITY_SQL_TRANSACTION_BATCH_SIZE));
const maxBytes = normalizeSqlTransactionByteBudget(options.maxBytes);
const database = normalizeRecordId(options.database) || DEFAULT_AUTHORITY_SQL_DATABASE;
const chunks = [];
let current = [];
const pushCurrent = () => {
if (current.length) {
chunks.push(current);
current = [];
}
};
for (const statement of normalizedStatements) {
if (!current.length) {
current.push(statement);
continue;
}
const candidate = [...current, statement];
const candidateBytes = estimateAuthoritySqlTransactionRequestBytes(candidate, database);
if (candidate.length > maxStatements || candidateBytes > maxBytes) {
pushCurrent();
}
current.push(statement);
}
pushCurrent();
return chunks;
}
function toPlainData(value, fallbackValue = null) {
if (value == null) return fallbackValue;
if (typeof globalThis.structuredClone === "function") {
@@ -390,13 +495,7 @@ export class AuthoritySqlHttpClient {
database: this.database,
statements: toArray(statements)
.filter((statement) => statement?.sql)
.map((statement) => {
const positional = convertNamedParamsToPositional(String(statement.sql || ""), statement.params || {});
return {
statement: positional.sql,
params: positional.params,
};
}),
.map(normalizeAuthorityTransactionStatement),
});
}
@@ -1039,15 +1138,11 @@ export class AuthorityGraphStore {
const normalizedStatements = toArray(statements).filter((statement) => statement?.sql);
if (!normalizedStatements.length) return null;
const BATCH_SIZE = 150;
if (typeof this.sqlClient?.transaction === "function") {
if (normalizedStatements.length <= BATCH_SIZE) {
return await this.sqlClient.transaction(normalizedStatements);
}
let lastResult = null;
for (let i = 0; i < normalizedStatements.length; i += BATCH_SIZE) {
const batch = normalizedStatements.slice(i, i + BATCH_SIZE);
lastResult = await this.sqlClient.transaction(batch);
const chunks = this._buildTransactionChunks(normalizedStatements);
for (const batch of chunks) {
lastResult = await this._executeTransactionChunk(batch);
}
return lastResult;
}
@@ -1078,6 +1173,42 @@ export class AuthorityGraphStore {
return result;
}
_buildTransactionChunks(statements = []) {
const budget = normalizeSqlTransactionByteBudget(
this.options?.sqlTransactionMaxBytes ?? this.options?.authoritySqlTransactionMaxBytes,
);
const maxBytes = Math.min(
AUTHORITY_SQL_TRANSACTION_MAX_REQUEST_BYTES - 64 * 1024,
Math.max(1024, budget),
);
return buildAuthoritySqlTransactionChunks(statements, {
database: this.sqlClient?.database || DEFAULT_AUTHORITY_SQL_DATABASE,
maxStatements: this.options?.sqlTransactionBatchSize ?? AUTHORITY_SQL_TRANSACTION_BATCH_SIZE,
maxBytes,
});
}
async _executeTransactionChunk(batch = []) {
const normalizedBatch = toArray(batch).filter((statement) => statement?.sql);
if (!normalizedBatch.length) return null;
try {
return await this.sqlClient.transaction(normalizedBatch);
} catch (error) {
if (!isAuthorityPayloadTooLargeError(error)) {
throw error;
}
if (normalizedBatch.length <= 1) {
throw createTerminalAuthoritySqlPayloadError(normalizedBatch[0], error);
}
const mid = Math.max(1, Math.floor(normalizedBatch.length / 2));
const left = normalizedBatch.slice(0, mid);
const right = normalizedBatch.slice(mid);
const leftResult = await this._executeTransactionChunk(left);
const rightResult = await this._executeTransactionChunk(right);
return rightResult ?? leftResult;
}
}
_upsertMetaStatement(key, value, nowMs = Date.now()) {
return {
sql: `INSERT INTO ${AUTHORITY_TABLES.meta} (chat_id, meta_key, value_json, updated_at) VALUES (:chatId, :key, :valueJson, :updatedAt) ON CONFLICT(chat_id, meta_key) DO UPDATE SET value_json = excluded.value_json, updated_at = excluded.updated_at`,