mirror of
https://github.com/Youzini-afk/ST-Bionic-Memory-Ecology.git
synced 2026-05-15 22:30:38 +08:00
506 lines
17 KiB
JavaScript
506 lines
17 KiB
JavaScript
import assert from "node:assert/strict";
|
|
|
|
import {
|
|
AUTHORITY_GRAPH_STORE_KIND,
|
|
AUTHORITY_GRAPH_STORE_MODE,
|
|
AuthorityGraphStore,
|
|
AuthoritySqlHttpClient,
|
|
convertNamedParamsToPositional,
|
|
} from "../sync/authority-graph-store.js";
|
|
import {
|
|
BME_DB_SCHEMA_VERSION,
|
|
BME_TOMBSTONE_RETENTION_MS,
|
|
} from "../sync/bme-db.js";
|
|
|
|
const PREFIX = "[ST-BME][authority-graph-store]";
|
|
|
|
class MockAuthoritySqlClient {
|
|
constructor() {
|
|
this.meta = new Map();
|
|
this.nodes = new Map();
|
|
this.edges = new Map();
|
|
this.tombstones = new Map();
|
|
this.statements = [];
|
|
}
|
|
|
|
async transaction(statements = []) {
|
|
for (const statement of statements) {
|
|
await this.execute(statement.sql, statement.params || {});
|
|
}
|
|
return { executed: statements.length };
|
|
}
|
|
|
|
async execute(sql, params = {}) {
|
|
this.statements.push({ sql, params });
|
|
const normalizedSql = String(sql || "").toLowerCase();
|
|
if (normalizedSql.startsWith("create table")) {
|
|
return { ok: true };
|
|
}
|
|
if (normalizedSql.includes("insert into st_bme_graph_meta")) {
|
|
this.meta.set(this._key(params.chatId, params.key), {
|
|
chat_id: params.chatId,
|
|
meta_key: params.key,
|
|
value_json: params.valueJson,
|
|
updated_at: params.updatedAt,
|
|
});
|
|
return { ok: true };
|
|
}
|
|
if (normalizedSql.includes("insert into st_bme_graph_nodes")) {
|
|
this.nodes.set(this._key(params.chatId, params.id), {
|
|
chat_id: params.chatId,
|
|
record_id: params.id,
|
|
payload_json: params.payloadJson,
|
|
node_type: params.type,
|
|
source_floor: params.sourceFloor,
|
|
archived: params.archived,
|
|
updated_at: params.updatedAt,
|
|
deleted_at: params.deletedAt,
|
|
});
|
|
return { ok: true };
|
|
}
|
|
if (normalizedSql.includes("insert into st_bme_graph_edges")) {
|
|
this.edges.set(this._key(params.chatId, params.id), {
|
|
chat_id: params.chatId,
|
|
record_id: params.id,
|
|
payload_json: params.payloadJson,
|
|
from_id: params.fromId,
|
|
to_id: params.toId,
|
|
relation: params.relation,
|
|
source_floor: params.sourceFloor,
|
|
updated_at: params.updatedAt,
|
|
deleted_at: params.deletedAt,
|
|
});
|
|
return { ok: true };
|
|
}
|
|
if (normalizedSql.includes("insert into st_bme_graph_tombstones")) {
|
|
this.tombstones.set(this._key(params.chatId, params.id), {
|
|
chat_id: params.chatId,
|
|
record_id: params.id,
|
|
payload_json: params.payloadJson,
|
|
tombstone_kind: params.kind,
|
|
target_id: params.targetId,
|
|
deleted_at: params.deletedAt,
|
|
source_device_id: params.sourceDeviceId,
|
|
});
|
|
return { ok: true };
|
|
}
|
|
if (normalizedSql.startsWith("delete from st_bme_graph_nodes")) {
|
|
this._deleteRows(this.nodes, params);
|
|
return { ok: true };
|
|
}
|
|
if (normalizedSql.startsWith("delete from st_bme_graph_edges")) {
|
|
this._deleteRows(this.edges, params);
|
|
return { ok: true };
|
|
}
|
|
if (normalizedSql.startsWith("delete from st_bme_graph_tombstones")) {
|
|
this._deleteRows(this.tombstones, params);
|
|
return { ok: true };
|
|
}
|
|
if (normalizedSql.startsWith("delete from st_bme_graph_meta")) {
|
|
this._deleteRows(this.meta, params);
|
|
return { ok: true };
|
|
}
|
|
throw new Error(`Unhandled SQL execute: ${sql}`);
|
|
}
|
|
|
|
async query(sql, params = {}) {
|
|
const normalizedSql = String(sql || "").toLowerCase();
|
|
if (normalizedSql.includes("from st_bme_graph_meta")) {
|
|
return this._readRows(this.meta, params).map((row) => ({
|
|
key: row.meta_key,
|
|
valueJson: row.value_json,
|
|
}));
|
|
}
|
|
if (normalizedSql.includes("from st_bme_graph_nodes")) {
|
|
if (normalizedSql.includes("count(*)")) {
|
|
return [{ count: this._readRows(this.nodes, params).length }];
|
|
}
|
|
return this._readRows(this.nodes, params).map((row) => ({
|
|
payloadJson: row.payload_json,
|
|
}));
|
|
}
|
|
if (normalizedSql.includes("from st_bme_graph_edges")) {
|
|
if (normalizedSql.includes("count(*)")) {
|
|
return [{ count: this._readRows(this.edges, params).length }];
|
|
}
|
|
return this._readRows(this.edges, params).map((row) => ({
|
|
payloadJson: row.payload_json,
|
|
}));
|
|
}
|
|
if (normalizedSql.includes("from st_bme_graph_tombstones")) {
|
|
if (normalizedSql.includes("count(*)")) {
|
|
return [{ count: this._readRows(this.tombstones, params).length }];
|
|
}
|
|
if (normalizedSql.includes("deleted_at <")) {
|
|
return this._readRows(this.tombstones, params)
|
|
.filter((row) => Number(row.deleted_at) < Number(params.cutoffMs))
|
|
.map((row) => ({ id: row.record_id }));
|
|
}
|
|
return this._readRows(this.tombstones, params).map((row) => ({
|
|
payloadJson: row.payload_json,
|
|
}));
|
|
}
|
|
throw new Error(`Unhandled SQL query: ${sql}`);
|
|
}
|
|
|
|
_key(chatId, id) {
|
|
return `${String(chatId || "")}\u0000${String(id || "")}`;
|
|
}
|
|
|
|
_readRows(table, params = {}) {
|
|
const chatId = String(params.chatId || "");
|
|
const id = params.id ?? params.key;
|
|
return Array.from(table.values()).filter((row) => {
|
|
if (String(row.chat_id || "") !== chatId) return false;
|
|
if (id == null) return true;
|
|
return String(row.record_id ?? row.meta_key ?? "") === String(id);
|
|
});
|
|
}
|
|
|
|
_deleteRows(table, params = {}) {
|
|
const chatId = String(params.chatId || "");
|
|
const id = params.id ?? params.key;
|
|
for (const [key, row] of table.entries()) {
|
|
if (String(row.chat_id || "") !== chatId) continue;
|
|
if (id != null && String(row.record_id ?? row.meta_key ?? "") !== String(id)) continue;
|
|
table.delete(key);
|
|
}
|
|
}
|
|
}
|
|
|
|
function createLimitError() {
|
|
const error = new Error("length limit exceeded");
|
|
error.status = 413;
|
|
error.category = "limit";
|
|
error.code = "length_limit_exceeded";
|
|
return error;
|
|
}
|
|
|
|
function isNodeUpsertStatement(statement = {}) {
|
|
return String(statement.sql || "").toLowerCase().includes("insert into st_bme_graph_nodes");
|
|
}
|
|
|
|
async function testOpenSeedsAuthorityMeta() {
|
|
const sqlClient = new MockAuthoritySqlClient();
|
|
const store = new AuthorityGraphStore("authority-chat-a", { sqlClient });
|
|
await store.open();
|
|
|
|
assert.equal(store.storeKind, AUTHORITY_GRAPH_STORE_KIND);
|
|
assert.equal(store.storeMode, AUTHORITY_GRAPH_STORE_MODE);
|
|
assert.equal(await store.getMeta("schemaVersion"), BME_DB_SCHEMA_VERSION);
|
|
assert.equal(await store.getMeta("storagePrimary"), AUTHORITY_GRAPH_STORE_KIND);
|
|
assert.equal(await store.getRevision(), 0);
|
|
|
|
const diagnostics = store.getStorageDiagnosticsSync();
|
|
assert.equal(diagnostics.storageKind, AUTHORITY_GRAPH_STORE_KIND);
|
|
assert.equal(diagnostics.browserCacheMode, "minimal");
|
|
}
|
|
|
|
async function testImportCommitAndExportSnapshot() {
|
|
const sqlClient = new MockAuthoritySqlClient();
|
|
const store = new AuthorityGraphStore("authority-chat-b", { sqlClient });
|
|
await store.open();
|
|
|
|
const importResult = await store.importSnapshot(
|
|
{
|
|
meta: {
|
|
revision: 7,
|
|
lastProcessedFloor: 3,
|
|
extractionCount: 4,
|
|
},
|
|
nodes: [
|
|
{ id: "node-1", type: "event", sourceFloor: 1, updatedAt: 10 },
|
|
{ id: "node-2", type: "event", archived: true, updatedAt: 20 },
|
|
{ id: "node-3", type: "memory", deletedAt: 30, updatedAt: 30 },
|
|
],
|
|
edges: [
|
|
{
|
|
id: "edge-1",
|
|
fromId: "node-1",
|
|
toId: "node-3",
|
|
relation: "refers",
|
|
updatedAt: 40,
|
|
},
|
|
],
|
|
tombstones: [
|
|
{
|
|
id: "tombstone-1",
|
|
kind: "node",
|
|
targetId: "node-old",
|
|
deletedAt: 50,
|
|
},
|
|
],
|
|
},
|
|
{ preserveRevision: true },
|
|
);
|
|
|
|
assert.equal(importResult.revision, 7);
|
|
assert.deepEqual(importResult.imported, { nodes: 3, edges: 1, tombstones: 1 });
|
|
assert.equal((await store.listNodes()).length, 3);
|
|
assert.deepEqual(
|
|
(await store.listNodes({ includeArchived: false, includeDeleted: false })).map((node) => node.id),
|
|
["node-1"],
|
|
);
|
|
assert.deepEqual((await store.listEdges({ relation: "refers" })).map((edge) => edge.id), ["edge-1"]);
|
|
|
|
const commitResult = await store.commitDelta(
|
|
{
|
|
upsertNodes: [{ id: "node-4", type: "event", updatedAt: 60 }],
|
|
deleteNodeIds: ["node-2"],
|
|
countDelta: {
|
|
previous: { nodes: 3, edges: 1, tombstones: 1 },
|
|
delta: { nodes: 0, edges: 0, tombstones: 0 },
|
|
},
|
|
runtimeMetaPatch: {
|
|
lastProcessedFloor: 8,
|
|
revision: 999,
|
|
},
|
|
},
|
|
{
|
|
reason: "test-commit",
|
|
requestedRevision: 9,
|
|
},
|
|
);
|
|
|
|
assert.equal(commitResult.revision, 9);
|
|
assert.deepEqual(commitResult.imported, { nodes: 3, edges: 1, tombstones: 1 });
|
|
assert.equal(await store.getMeta("lastProcessedFloor"), 8);
|
|
assert.equal(await store.getRevision(), 9);
|
|
assert.equal(await store.getMeta("lastMutationReason"), "test-commit");
|
|
assert.equal(await store.getMeta("syncDirty"), true);
|
|
assert.deepEqual((await store.listNodes()).map((node) => node.id).sort(), ["node-1", "node-3", "node-4"]);
|
|
|
|
const snapshot = await store.exportSnapshot();
|
|
assert.equal(snapshot.meta.revision, 9);
|
|
assert.equal(snapshot.meta.storagePrimary, AUTHORITY_GRAPH_STORE_KIND);
|
|
assert.equal(snapshot.meta.storageMode, AUTHORITY_GRAPH_STORE_MODE);
|
|
assert.equal(snapshot.meta.nodeCount, 3);
|
|
assert.equal(snapshot.nodes.length, 3);
|
|
assert.equal(snapshot.edges.length, 1);
|
|
assert.equal(snapshot.tombstones.length, 1);
|
|
assert.equal(snapshot.state.lastProcessedFloor, 8);
|
|
}
|
|
|
|
async function testPruneAndClear() {
|
|
const sqlClient = new MockAuthoritySqlClient();
|
|
const store = new AuthorityGraphStore("authority-chat-c", { sqlClient });
|
|
await store.importSnapshot({
|
|
nodes: [{ id: "node-1", type: "event", updatedAt: 1 }],
|
|
tombstones: [
|
|
{ id: "old-tombstone", kind: "node", targetId: "old", deletedAt: 1 },
|
|
{
|
|
id: "new-tombstone",
|
|
kind: "node",
|
|
targetId: "new",
|
|
deletedAt: BME_TOMBSTONE_RETENTION_MS,
|
|
},
|
|
],
|
|
});
|
|
|
|
const pruneResult = await store.pruneExpiredTombstones(BME_TOMBSTONE_RETENTION_MS + 100);
|
|
assert.equal(pruneResult.pruned, 1);
|
|
assert.deepEqual((await store.listTombstones()).map((item) => item.id), ["new-tombstone"]);
|
|
|
|
const clearResult = await store.clearAll();
|
|
assert.equal(clearResult.cleared, true);
|
|
assert.equal((await store.isEmpty({ includeTombstones: true })).empty, true);
|
|
assert.equal(await store.getMeta("storagePrimary"), AUTHORITY_GRAPH_STORE_KIND);
|
|
}
|
|
|
|
async function testHttpSqlClientBoundary() {
|
|
const requests = [];
|
|
const client = new AuthoritySqlHttpClient({
|
|
baseUrl: "https://authority.example.test/root/",
|
|
headerProvider: () => ({ "X-Test": "1" }),
|
|
fetchImpl: async (url, init) => {
|
|
requests.push({ url, init });
|
|
if (url.endsWith("/session/init")) {
|
|
return {
|
|
ok: true,
|
|
status: 200,
|
|
async json() {
|
|
return { sessionToken: "sql-session-token" };
|
|
},
|
|
};
|
|
}
|
|
return {
|
|
ok: true,
|
|
status: 200,
|
|
async json() {
|
|
return { rows: [{ value: 1 }] };
|
|
},
|
|
};
|
|
},
|
|
});
|
|
|
|
const result = await client.query("SELECT 1", { chatId: "chat" });
|
|
assert.deepEqual(result, { rows: [{ value: 1 }] });
|
|
assert.deepEqual(
|
|
requests.map((request) => request.url),
|
|
[
|
|
"https://authority.example.test/root/session/init",
|
|
"https://authority.example.test/root/sql/query",
|
|
],
|
|
);
|
|
assert.equal(requests[1].init.method, "POST");
|
|
assert.equal(requests[1].init.headers["X-Test"], "1");
|
|
assert.equal(requests[1].init.headers["x-authority-session-token"], "sql-session-token");
|
|
assert.deepEqual(JSON.parse(requests[1].init.body), {
|
|
database: "default",
|
|
statement: "SELECT 1",
|
|
params: [],
|
|
});
|
|
}
|
|
|
|
async function testConvertNamedParamsToPositional() {
|
|
// Named params with :placeholders get converted to positional ? with array
|
|
const r1 = convertNamedParamsToPositional(
|
|
"SELECT * FROM t WHERE chat_id = :chatId AND meta_key = :key",
|
|
{ chatId: "abc", key: "rev" },
|
|
);
|
|
assert.equal(r1.sql, "SELECT * FROM t WHERE chat_id = ? AND meta_key = ?");
|
|
assert.deepEqual(r1.params, ["abc", "rev"]);
|
|
|
|
// Duplicate named params produce multiple positional entries
|
|
const r2 = convertNamedParamsToPositional(
|
|
"INSERT INTO t (a, b) VALUES (:chatId, :chatId)",
|
|
{ chatId: "dup" },
|
|
);
|
|
assert.equal(r2.sql, "INSERT INTO t (a, b) VALUES (?, ?)");
|
|
assert.deepEqual(r2.params, ["dup", "dup"]);
|
|
|
|
// No placeholders → empty array
|
|
const r3 = convertNamedParamsToPositional("SELECT 1", { chatId: "x" });
|
|
assert.equal(r3.sql, "SELECT 1");
|
|
assert.deepEqual(r3.params, []);
|
|
|
|
// Already-array params pass through unchanged
|
|
const r4 = convertNamedParamsToPositional("SELECT ?", [42]);
|
|
assert.equal(r4.sql, "SELECT ?");
|
|
assert.deepEqual(r4.params, [42]);
|
|
|
|
// Empty/null params → empty array
|
|
const r5 = convertNamedParamsToPositional("SELECT 1", null);
|
|
assert.deepEqual(r5.params, []);
|
|
const r6 = convertNamedParamsToPositional("SELECT 1", undefined);
|
|
assert.deepEqual(r6.params, []);
|
|
|
|
// Missing param name → null in array
|
|
const r7 = convertNamedParamsToPositional(
|
|
"WHERE x = :x AND y = :y",
|
|
{ x: 1 },
|
|
);
|
|
assert.equal(r7.sql, "WHERE x = ? AND y = ?");
|
|
assert.deepEqual(r7.params, [1, null]);
|
|
|
|
// ::typecast is not treated as a named param
|
|
const r8 = convertNamedParamsToPositional(
|
|
"SELECT x::text FROM t WHERE id = :id",
|
|
{ id: 5 },
|
|
);
|
|
assert.equal(r8.sql, "SELECT x::text FROM t WHERE id = ?");
|
|
assert.deepEqual(r8.params, [5]);
|
|
}
|
|
|
|
async function testTransactionBatchingUsesByteBudget() {
|
|
const sqlClient = new MockAuthoritySqlClient();
|
|
const batchSizes = [];
|
|
const originalTransaction = sqlClient.transaction.bind(sqlClient);
|
|
sqlClient.transaction = async (statements = []) => {
|
|
if (statements.some(isNodeUpsertStatement)) {
|
|
batchSizes.push(statements.length);
|
|
}
|
|
return await originalTransaction(statements);
|
|
};
|
|
const store = new AuthorityGraphStore("authority-chat-byte-budget", {
|
|
sqlClient,
|
|
sqlTransactionBatchSize: 150,
|
|
sqlTransactionMaxBytes: 4096,
|
|
});
|
|
const nodes = Array.from({ length: 8 }, (_, index) => ({
|
|
id: `node-${index}`,
|
|
type: "event",
|
|
text: `payload-${index}-${"測試".repeat(260)}`,
|
|
updatedAt: index + 1,
|
|
}));
|
|
|
|
await store.bulkUpsertNodes(nodes);
|
|
|
|
assert.equal((await store.listNodes()).length, nodes.length);
|
|
assert.ok(batchSizes.length > 1, "expected large payload to split into multiple transactions");
|
|
assert.ok(batchSizes.every((size) => size < nodes.length), "expected no single node transaction batch to contain all records");
|
|
}
|
|
|
|
async function testTransaction413SplitsWithoutRepeatingOversizedBatch() {
|
|
const sqlClient = new MockAuthoritySqlClient();
|
|
const attemptedBatchSizes = [];
|
|
const originalTransaction = sqlClient.transaction.bind(sqlClient);
|
|
sqlClient.transaction = async (statements = []) => {
|
|
const nodeBatchSize = statements.filter(isNodeUpsertStatement).length;
|
|
if (nodeBatchSize > 0) {
|
|
attemptedBatchSizes.push(nodeBatchSize);
|
|
if (nodeBatchSize > 2) {
|
|
throw createLimitError();
|
|
}
|
|
}
|
|
return await originalTransaction(statements);
|
|
};
|
|
const store = new AuthorityGraphStore("authority-chat-413-split", {
|
|
sqlClient,
|
|
sqlTransactionBatchSize: 150,
|
|
sqlTransactionMaxBytes: 512 * 1024,
|
|
});
|
|
const nodes = Array.from({ length: 6 }, (_, index) => ({
|
|
id: `node-${index}`,
|
|
type: "event",
|
|
text: `payload-${index}`,
|
|
updatedAt: index + 1,
|
|
}));
|
|
|
|
await store.bulkUpsertNodes(nodes);
|
|
|
|
assert.equal((await store.listNodes()).length, nodes.length);
|
|
assert.equal(attemptedBatchSizes[0], 6);
|
|
assert.ok(attemptedBatchSizes.some((size) => size <= 2), "expected oversized request to be split below the failing size");
|
|
assert.ok(attemptedBatchSizes.length <= 10, "expected bounded split attempts instead of an endless retry loop");
|
|
}
|
|
|
|
async function testSingleStatement413IsTerminal() {
|
|
const sqlClient = new MockAuthoritySqlClient();
|
|
let oversizedAttempts = 0;
|
|
sqlClient.transaction = async (statements = []) => {
|
|
if (statements.some(isNodeUpsertStatement)) {
|
|
oversizedAttempts += 1;
|
|
throw createLimitError();
|
|
}
|
|
for (const statement of statements) {
|
|
await sqlClient.execute(statement.sql, statement.params || {});
|
|
}
|
|
return { executed: statements.length };
|
|
};
|
|
const store = new AuthorityGraphStore("authority-chat-single-413", { sqlClient });
|
|
|
|
await assert.rejects(
|
|
() => store.bulkUpsertNodes([{ id: "too-large", type: "event", text: "x".repeat(1024), updatedAt: 1 }]),
|
|
(error) => {
|
|
assert.equal(error.name, "AuthoritySqlPayloadTooLargeError");
|
|
assert.equal(error.nonRetryable, true);
|
|
assert.equal(error.terminal, true);
|
|
return true;
|
|
},
|
|
);
|
|
assert.ok(oversizedAttempts >= 1, "expected the oversized record to be attempted at least once");
|
|
assert.ok(oversizedAttempts <= 4, "single oversized record must not retry forever");
|
|
}
|
|
|
|
await testConvertNamedParamsToPositional();
|
|
await testOpenSeedsAuthorityMeta();
|
|
await testImportCommitAndExportSnapshot();
|
|
await testPruneAndClear();
|
|
await testHttpSqlClientBoundary();
|
|
await testTransactionBatchingUsesByteBudget();
|
|
await testTransaction413SplitsWithoutRepeatingOversizedBatch();
|
|
await testSingleStatement413IsTerminal();
|
|
|
|
console.log(`${PREFIX} all tests passed`);
|