Compare commits

...

2 Commits

Author SHA1 Message Date
github-actions[bot]
54ea001b8c chore: bump manifest version [skip ci] 2026-05-06 15:37:46 +00:00
youzini
db5a341617 fix(authority): split graph SQL persistence by payload size 2026-05-06 15:37:19 +00:00
3 changed files with 252 additions and 15 deletions

View File

@@ -6,6 +6,6 @@
"js": "index.js",
"css": "style.css",
"author": "Youzini",
"version": "6.2.9",
"version": "6.3.0",
"homePage": "https://github.com/Youzini-afk/ST-Bionic-Memory-Ecology"
}

View File

@@ -18,6 +18,9 @@ const DEFAULT_AUTHORITY_SQL_DATABASE = "default";
const AUTHORITY_SQL_QUERY_ENDPOINT = "/sql/query";
const AUTHORITY_SQL_EXEC_ENDPOINT = "/sql/exec";
const AUTHORITY_SQL_TRANSACTION_ENDPOINT = "/sql/transaction";
const AUTHORITY_SQL_TRANSACTION_BATCH_SIZE = 150;
const AUTHORITY_SQL_TRANSACTION_MAX_REQUEST_BYTES = 1024 * 1024;
const AUTHORITY_SQL_TRANSACTION_SAFE_REQUEST_BYTES = 512 * 1024;
const AUTHORITY_TABLES = Object.freeze({
meta: "st_bme_graph_meta",
@@ -85,6 +88,108 @@ function estimatePersistPayloadBytes(value = null) {
}
}
function measureJsonBytes(value = null) {
let json = "";
try {
json = JSON.stringify(value ?? null);
} catch {
json = "";
}
if (typeof TextEncoder === "function") {
return new TextEncoder().encode(json).byteLength;
}
return json.length * 3;
}
function normalizeSqlTransactionByteBudget(value) {
const parsed = Number(value);
if (!Number.isFinite(parsed) || parsed <= 0) return AUTHORITY_SQL_TRANSACTION_SAFE_REQUEST_BYTES;
return Math.max(1024, Math.floor(parsed));
}
function isAuthorityPayloadTooLargeError(error = null) {
const status = Number(error?.status || error?.payload?.status || 0);
const category = String(error?.category || error?.payload?.category || "").toLowerCase();
const code = String(error?.code || error?.payload?.code || error?.payload?.reason || "").toLowerCase();
const message = String(error?.message || error?.payload?.error || error?.payload?.message || "").toLowerCase();
return (
status === 413 ||
category === "payload-too-large" ||
category === "limit" ||
code.includes("length_limit") ||
code.includes("payload") ||
message.includes("length limit") ||
message.includes("payload too large") ||
message.includes("request entity too large")
);
}
function createTerminalAuthoritySqlPayloadError(statement = null, cause = null) {
const estimatedBytes = estimateAuthoritySqlTransactionRequestBytes([statement], DEFAULT_AUTHORITY_SQL_DATABASE);
const error = new Error(
`Authority SQL transaction payload is too large for a single graph persistence statement (${estimatedBytes} bytes estimated); refusing endless retry`,
);
error.name = "AuthoritySqlPayloadTooLargeError";
error.status = Number(cause?.status || 413);
error.code = "authority_sql_payload_too_large";
error.category = "payload-too-large";
error.terminal = true;
error.nonRetryable = true;
error.estimatedBytes = estimatedBytes;
error.cause = cause;
return error;
}
function normalizeAuthorityTransactionStatement(statement = {}) {
const positional = convertNamedParamsToPositional(String(statement?.sql || ""), statement?.params || {});
return {
statement: positional.sql,
params: positional.params,
};
}
function estimateAuthoritySqlTransactionRequestBytes(statements = [], database = DEFAULT_AUTHORITY_SQL_DATABASE) {
return measureJsonBytes({
database: normalizeRecordId(database) || DEFAULT_AUTHORITY_SQL_DATABASE,
statements: toArray(statements)
.filter((statement) => statement?.sql)
.map(normalizeAuthorityTransactionStatement),
});
}
function buildAuthoritySqlTransactionChunks(statements = [], options = {}) {
const normalizedStatements = toArray(statements).filter((statement) => statement?.sql);
const maxStatements = Math.max(1, Math.floor(Number(options.maxStatements) || AUTHORITY_SQL_TRANSACTION_BATCH_SIZE));
const maxBytes = normalizeSqlTransactionByteBudget(options.maxBytes);
const database = normalizeRecordId(options.database) || DEFAULT_AUTHORITY_SQL_DATABASE;
const chunks = [];
let current = [];
const pushCurrent = () => {
if (current.length) {
chunks.push(current);
current = [];
}
};
for (const statement of normalizedStatements) {
if (!current.length) {
current.push(statement);
continue;
}
const candidate = [...current, statement];
const candidateBytes = estimateAuthoritySqlTransactionRequestBytes(candidate, database);
if (candidate.length > maxStatements || candidateBytes > maxBytes) {
pushCurrent();
}
current.push(statement);
}
pushCurrent();
return chunks;
}
function toPlainData(value, fallbackValue = null) {
if (value == null) return fallbackValue;
if (typeof globalThis.structuredClone === "function") {
@@ -390,13 +495,7 @@ export class AuthoritySqlHttpClient {
database: this.database,
statements: toArray(statements)
.filter((statement) => statement?.sql)
.map((statement) => {
const positional = convertNamedParamsToPositional(String(statement.sql || ""), statement.params || {});
return {
statement: positional.sql,
params: positional.params,
};
}),
.map(normalizeAuthorityTransactionStatement),
});
}
@@ -1039,15 +1138,11 @@ export class AuthorityGraphStore {
const normalizedStatements = toArray(statements).filter((statement) => statement?.sql);
if (!normalizedStatements.length) return null;
const BATCH_SIZE = 150;
if (typeof this.sqlClient?.transaction === "function") {
if (normalizedStatements.length <= BATCH_SIZE) {
return await this.sqlClient.transaction(normalizedStatements);
}
let lastResult = null;
for (let i = 0; i < normalizedStatements.length; i += BATCH_SIZE) {
const batch = normalizedStatements.slice(i, i + BATCH_SIZE);
lastResult = await this.sqlClient.transaction(batch);
const chunks = this._buildTransactionChunks(normalizedStatements);
for (const batch of chunks) {
lastResult = await this._executeTransactionChunk(batch);
}
return lastResult;
}
@@ -1078,6 +1173,42 @@ export class AuthorityGraphStore {
return result;
}
_buildTransactionChunks(statements = []) {
const budget = normalizeSqlTransactionByteBudget(
this.options?.sqlTransactionMaxBytes ?? this.options?.authoritySqlTransactionMaxBytes,
);
const maxBytes = Math.min(
AUTHORITY_SQL_TRANSACTION_MAX_REQUEST_BYTES - 64 * 1024,
Math.max(1024, budget),
);
return buildAuthoritySqlTransactionChunks(statements, {
database: this.sqlClient?.database || DEFAULT_AUTHORITY_SQL_DATABASE,
maxStatements: this.options?.sqlTransactionBatchSize ?? AUTHORITY_SQL_TRANSACTION_BATCH_SIZE,
maxBytes,
});
}
async _executeTransactionChunk(batch = []) {
const normalizedBatch = toArray(batch).filter((statement) => statement?.sql);
if (!normalizedBatch.length) return null;
try {
return await this.sqlClient.transaction(normalizedBatch);
} catch (error) {
if (!isAuthorityPayloadTooLargeError(error)) {
throw error;
}
if (normalizedBatch.length <= 1) {
throw createTerminalAuthoritySqlPayloadError(normalizedBatch[0], error);
}
const mid = Math.max(1, Math.floor(normalizedBatch.length / 2));
const left = normalizedBatch.slice(0, mid);
const right = normalizedBatch.slice(mid);
const leftResult = await this._executeTransactionChunk(left);
const rightResult = await this._executeTransactionChunk(right);
return rightResult ?? leftResult;
}
}
_upsertMetaStatement(key, value, nowMs = Date.now()) {
return {
sql: `INSERT INTO ${AUTHORITY_TABLES.meta} (chat_id, meta_key, value_json, updated_at) VALUES (:chatId, :key, :valueJson, :updatedAt) ON CONFLICT(chat_id, meta_key) DO UPDATE SET value_json = excluded.value_json, updated_at = excluded.updated_at`,

View File

@@ -168,6 +168,18 @@ class MockAuthoritySqlClient {
}
}
function createLimitError() {
const error = new Error("length limit exceeded");
error.status = 413;
error.category = "limit";
error.code = "length_limit_exceeded";
return error;
}
function isNodeUpsertStatement(statement = {}) {
return String(statement.sql || "").toLowerCase().includes("insert into st_bme_graph_nodes");
}
async function testOpenSeedsAuthorityMeta() {
const sqlClient = new MockAuthoritySqlClient();
const store = new AuthorityGraphStore("authority-chat-a", { sqlClient });
@@ -390,10 +402,104 @@ async function testConvertNamedParamsToPositional() {
assert.deepEqual(r8.params, [5]);
}
async function testTransactionBatchingUsesByteBudget() {
const sqlClient = new MockAuthoritySqlClient();
const batchSizes = [];
const originalTransaction = sqlClient.transaction.bind(sqlClient);
sqlClient.transaction = async (statements = []) => {
if (statements.some(isNodeUpsertStatement)) {
batchSizes.push(statements.length);
}
return await originalTransaction(statements);
};
const store = new AuthorityGraphStore("authority-chat-byte-budget", {
sqlClient,
sqlTransactionBatchSize: 150,
sqlTransactionMaxBytes: 4096,
});
const nodes = Array.from({ length: 8 }, (_, index) => ({
id: `node-${index}`,
type: "event",
text: `payload-${index}-${"測試".repeat(260)}`,
updatedAt: index + 1,
}));
await store.bulkUpsertNodes(nodes);
assert.equal((await store.listNodes()).length, nodes.length);
assert.ok(batchSizes.length > 1, "expected large payload to split into multiple transactions");
assert.ok(batchSizes.every((size) => size < nodes.length), "expected no single node transaction batch to contain all records");
}
async function testTransaction413SplitsWithoutRepeatingOversizedBatch() {
const sqlClient = new MockAuthoritySqlClient();
const attemptedBatchSizes = [];
const originalTransaction = sqlClient.transaction.bind(sqlClient);
sqlClient.transaction = async (statements = []) => {
const nodeBatchSize = statements.filter(isNodeUpsertStatement).length;
if (nodeBatchSize > 0) {
attemptedBatchSizes.push(nodeBatchSize);
if (nodeBatchSize > 2) {
throw createLimitError();
}
}
return await originalTransaction(statements);
};
const store = new AuthorityGraphStore("authority-chat-413-split", {
sqlClient,
sqlTransactionBatchSize: 150,
sqlTransactionMaxBytes: 512 * 1024,
});
const nodes = Array.from({ length: 6 }, (_, index) => ({
id: `node-${index}`,
type: "event",
text: `payload-${index}`,
updatedAt: index + 1,
}));
await store.bulkUpsertNodes(nodes);
assert.equal((await store.listNodes()).length, nodes.length);
assert.equal(attemptedBatchSizes[0], 6);
assert.ok(attemptedBatchSizes.some((size) => size <= 2), "expected oversized request to be split below the failing size");
assert.ok(attemptedBatchSizes.length <= 10, "expected bounded split attempts instead of an endless retry loop");
}
async function testSingleStatement413IsTerminal() {
const sqlClient = new MockAuthoritySqlClient();
let oversizedAttempts = 0;
sqlClient.transaction = async (statements = []) => {
if (statements.some(isNodeUpsertStatement)) {
oversizedAttempts += 1;
throw createLimitError();
}
for (const statement of statements) {
await sqlClient.execute(statement.sql, statement.params || {});
}
return { executed: statements.length };
};
const store = new AuthorityGraphStore("authority-chat-single-413", { sqlClient });
await assert.rejects(
() => store.bulkUpsertNodes([{ id: "too-large", type: "event", text: "x".repeat(1024), updatedAt: 1 }]),
(error) => {
assert.equal(error.name, "AuthoritySqlPayloadTooLargeError");
assert.equal(error.nonRetryable, true);
assert.equal(error.terminal, true);
return true;
},
);
assert.ok(oversizedAttempts >= 1, "expected the oversized record to be attempted at least once");
assert.ok(oversizedAttempts <= 4, "single oversized record must not retry forever");
}
await testConvertNamedParamsToPositional();
await testOpenSeedsAuthorityMeta();
await testImportCommitAndExportSnapshot();
await testPruneAndClear();
await testHttpSqlClientBoundary();
await testTransactionBatchingUsesByteBudget();
await testTransaction413SplitsWithoutRepeatingOversizedBatch();
await testSingleStatement413IsTerminal();
console.log(`${PREFIX} all tests passed`);