harden(authority): add pre-scale diagnostics and request safety

This commit is contained in:
Youzini-afk
2026-04-28 22:28:21 +08:00
parent aa62efe5b9
commit 2dee3cd8ff
9 changed files with 644 additions and 49 deletions

View File

@@ -61,6 +61,31 @@ export function normalizeAuthorityBlobPath(path = "") {
return normalized.replace(/\/+$/g, "");
}
function decodePathForValidation(path = "") {
try {
return decodeURIComponent(String(path || ""));
} catch {
return String(path || "");
}
}
function assertSafeAuthorityBlobPath(path = "", options = {}) {
const normalized = normalizeAuthorityBlobPath(path);
if (!normalized) {
if (options.allowEmpty) return "";
throw new Error("Authority Blob path is required");
}
const decoded = decodePathForValidation(normalized).replace(/\\/g, "/");
if (/^[A-Za-z]:(?:\/|$)/.test(decoded) || decoded.includes(":/")) {
throw new Error(`Unsafe Authority Blob path: ${normalized}`);
}
const segments = decoded.split("/").filter(Boolean);
if (segments.some((segment) => segment === "." || segment === "..")) {
throw new Error(`Unsafe Authority Blob path: ${normalized}`);
}
return normalized;
}
function normalizeBlobPayload(result = null) {
if (!result || typeof result !== "object" || Array.isArray(result)) return result;
const source = result.file || result.blob || result.result || result;
@@ -197,8 +222,9 @@ export class AuthorityBlobHttpClient {
}
async writeText(payload = {}) {
const path = assertSafeAuthorityBlobPath(payload.path || payload.name);
return await this.request(`${AUTHORITY_BLOB_ENDPOINT}/write-file`, {
path: normalizeAuthorityBlobPath(payload.path || payload.name),
path,
content: String(payload.text ?? payload.data ?? payload.content ?? ""),
encoding: "utf8",
createParents: true,
@@ -206,22 +232,25 @@ export class AuthorityBlobHttpClient {
}
async readJson(payload = {}) {
const path = assertSafeAuthorityBlobPath(payload.path || payload.name);
return await this.request(`${AUTHORITY_BLOB_ENDPOINT}/read-file`, {
path: normalizeAuthorityBlobPath(payload.path || payload.name),
path,
encoding: "utf8",
}, { signal: payload.signal });
}
async delete(payload = {}) {
const path = assertSafeAuthorityBlobPath(payload.path || payload.name);
return await this.request(`${AUTHORITY_BLOB_ENDPOINT}/delete`, {
path: normalizeAuthorityBlobPath(payload.path || payload.name),
path,
recursive: false,
}, { signal: payload.signal });
}
async stat(payload = {}) {
const path = assertSafeAuthorityBlobPath(payload.path || payload.name);
return await this.request(`${AUTHORITY_BLOB_ENDPOINT}/stat`, {
path: normalizeAuthorityBlobPath(payload.path || payload.name),
path,
}, { signal: payload.signal });
}
}
@@ -271,8 +300,7 @@ export class AuthorityBlobAdapter {
async writeJson(path, payload = null, options = {}) {
throwIfAborted(options.signal);
const normalizedPath = normalizeAuthorityBlobPath(path);
if (!normalizedPath) throw new Error("Authority Blob path is required");
const normalizedPath = assertSafeAuthorityBlobPath(path);
const result = await callClient(this.client, ["writeJson", "putJson", "writeFile", "put"], "writeJson", {
namespace: options.namespace || this.config.namespace,
path: normalizedPath,
@@ -287,8 +315,7 @@ export class AuthorityBlobAdapter {
async writeText(path, text = "", options = {}) {
throwIfAborted(options.signal);
const normalizedPath = normalizeAuthorityBlobPath(path);
if (!normalizedPath) throw new Error("Authority Blob path is required");
const normalizedPath = assertSafeAuthorityBlobPath(path);
const result = await callClient(this.client, ["writeText", "writeFile", "putText", "put"], "writeText", {
namespace: options.namespace || this.config.namespace,
path: normalizedPath,
@@ -303,7 +330,7 @@ export class AuthorityBlobAdapter {
async readJson(path, options = {}) {
throwIfAborted(options.signal);
const normalizedPath = normalizeAuthorityBlobPath(path);
const normalizedPath = assertSafeAuthorityBlobPath(path, { allowEmpty: true });
if (!normalizedPath) return normalizeAuthorityBlobReadResult({ exists: false }, "");
try {
const result = await callClient(this.client, ["readJson", "getJson", "readFile", "get"], "readJson", {
@@ -322,7 +349,7 @@ export class AuthorityBlobAdapter {
async delete(path, options = {}) {
throwIfAborted(options.signal);
const normalizedPath = normalizeAuthorityBlobPath(path);
const normalizedPath = assertSafeAuthorityBlobPath(path, { allowEmpty: true });
if (!normalizedPath) return normalizeAuthorityBlobDeleteResult({ exists: false }, "");
try {
const result = await callClient(this.client, ["delete", "deleteFile", "remove", "unlink"], "delete", {
@@ -341,7 +368,7 @@ export class AuthorityBlobAdapter {
async stat(path, options = {}) {
throwIfAborted(options.signal);
const normalizedPath = normalizeAuthorityBlobPath(path);
const normalizedPath = assertSafeAuthorityBlobPath(path, { allowEmpty: true });
if (!normalizedPath) return normalizeAuthorityBlobReadResult({ exists: false }, "");
try {
const result = await callClient(this.client, ["stat", "head", "metadata"], "stat", {

View File

@@ -245,6 +245,8 @@ export function normalizeAuthorityJobConfig(settings = {}, overrides = {}) {
failOpen: source.authorityFailOpen !== false && source.failOpen !== false,
preferStream: source.authorityJobPreferStream !== false && source.jobStreamPreferred !== false,
pollIntervalMs: normalizeInteger(source.authorityJobPollIntervalMs ?? source.pollIntervalMs, 1200, 250, 30000),
pollMaxIntervalMs: normalizeInteger(source.authorityJobPollMaxIntervalMs ?? source.pollMaxIntervalMs, 5000, 250, 60000),
pollBackoffFactor: Math.min(5, Math.max(1, Number(source.authorityJobPollBackoffFactor ?? source.pollBackoffFactor ?? 1.25) || 1.25)),
waitTimeoutMs: normalizeInteger(source.authorityJobWaitTimeoutMs ?? source.waitTimeoutMs, 0, 0, 3600000),
...overrides,
};
@@ -439,20 +441,75 @@ export class AuthorityJobAdapter {
id,
timeoutMs: normalizeInteger(options.timeoutMs, this.config.waitTimeoutMs, 0, 3600000),
});
return normalizeAuthorityJobRecord(result?.job || result?.result || result);
const normalized = normalizeAuthorityJobRecord(result?.job || result?.result || result);
return {
...normalized,
waitDiagnostics: {
mode: "client",
pollCount: 0,
elapsedMs: 0,
timeoutMs: normalizeInteger(options.timeoutMs, this.config.waitTimeoutMs, 0, 3600000),
terminal: normalized.terminal,
},
};
}
const startedAt = Date.now();
const timeoutMs = normalizeInteger(options.timeoutMs, this.config.waitTimeoutMs, 0, 3600000);
const pollIntervalMs = normalizeInteger(options.pollIntervalMs, this.config.pollIntervalMs, 250, 30000);
const initialPollIntervalMs = normalizeInteger(options.pollIntervalMs, this.config.pollIntervalMs, 250, 30000);
const maxPollIntervalMs = Math.max(
initialPollIntervalMs,
normalizeInteger(options.pollMaxIntervalMs, this.config.pollMaxIntervalMs, 250, 60000),
);
const backoffFactor = Math.min(5, Math.max(1, Number(options.pollBackoffFactor ?? this.config.pollBackoffFactor) || 1));
let pollIntervalMs = initialPollIntervalMs;
let pollCount = 0;
let lastJob = normalizeAuthorityJobRecord(null);
while (true) {
throwIfAborted(options.signal);
const job = await this.get(id, options);
if (job.terminal) return job;
if (timeoutMs > 0 && Date.now() - startedAt >= timeoutMs) {
return { ...job, status: "timeout", terminal: true, success: false, error: "wait timeout" };
pollCount += 1;
lastJob = job;
const elapsedMs = Date.now() - startedAt;
if (job.terminal) {
return {
...job,
waitDiagnostics: {
mode: "poll",
pollCount,
elapsedMs,
timeoutMs,
pollIntervalMs: initialPollIntervalMs,
maxPollIntervalMs,
backoffFactor,
terminal: true,
},
};
}
await sleep(pollIntervalMs, options.signal);
if (timeoutMs > 0 && elapsedMs >= timeoutMs) {
return {
...job,
status: "timeout",
terminal: true,
success: false,
error: "wait timeout",
waitDiagnostics: {
mode: "poll",
pollCount,
elapsedMs,
timeoutMs,
pollIntervalMs: initialPollIntervalMs,
maxPollIntervalMs,
backoffFactor,
terminal: false,
lastStatus: lastJob.status,
lastProgress: lastJob.progress,
},
};
}
const remainingMs = timeoutMs > 0 ? Math.max(0, timeoutMs - elapsedMs) : pollIntervalMs;
await sleep(timeoutMs > 0 ? Math.min(pollIntervalMs, remainingMs) : pollIntervalMs, options.signal);
pollIntervalMs = Math.min(maxPollIntervalMs, Math.max(initialPollIntervalMs, Math.ceil(pollIntervalMs * backoffFactor)));
}
}

View File

@@ -31,6 +31,15 @@ function normalizeHeaderName(name = "") {
return String(name || "").trim().toLowerCase();
}
function normalizeTimeoutMs(value, fallbackValue = 0) {
const parsed = Number(value);
if (!Number.isFinite(parsed) || parsed <= 0) {
const fallback = Number(fallbackValue);
return Number.isFinite(fallback) && fallback > 0 ? Math.floor(fallback) : 0;
}
return Math.floor(parsed);
}
function hasSessionHeader(headers = {}) {
return Object.keys(headers || {}).some((name) => normalizeHeaderName(name) === AUTHORITY_SESSION_HEADER);
}
@@ -59,6 +68,72 @@ function readPayloadErrorMessage(payload = null, fallback = "") {
return String(payload.error || payload.message || payload.reason || fallback || "");
}
function readPayloadCode(payload = null) {
if (!payload || typeof payload !== "object" || Array.isArray(payload)) return "";
return String(payload.code || payload.reason || payload.category || payload.errorCode || "").trim().toLowerCase();
}
function isSessionRetryCandidate(status = 0, payload = null) {
const numericStatus = Number(status || 0);
if (numericStatus === 401) return true;
if (numericStatus !== 403) return false;
const code = readPayloadCode(payload);
const message = readPayloadErrorMessage(payload, "").toLowerCase();
return /session|token/.test(`${code} ${message}`) && /invalid|expired|missing|unauthorized/.test(`${code} ${message}`);
}
function classifyAuthorityError({ status = 0, payload = null, error = null, timedOut = false, aborted = false } = {}) {
const numericStatus = Number(status || 0);
const payloadCategory = String(payload?.category || "").trim();
if (payloadCategory) return payloadCategory;
if (timedOut || numericStatus === 408) return "timeout";
if (aborted) return "aborted";
if (isSessionRetryCandidate(numericStatus, payload)) return "session";
if (numericStatus === 403) return "permission";
if (numericStatus === 404) return "not-found";
if (numericStatus === 413) return "payload-too-large";
if (numericStatus === 429) return "rate-limit";
if (numericStatus >= 500) return "server";
if (numericStatus >= 400) return "validation";
if (error) return "network";
return "";
}
function createRequestSignal(signal = undefined, timeoutMs = 0) {
const normalizedTimeoutMs = normalizeTimeoutMs(timeoutMs, 0);
if (!signal && normalizedTimeoutMs <= 0) {
return { signal: undefined, cleanup: () => {}, timedOut: () => false };
}
if (typeof AbortController !== "function") {
return { signal, cleanup: () => {}, timedOut: () => false };
}
const controller = new AbortController();
let timeoutId = null;
let timedOut = false;
const abortFromSignal = () => {
controller.abort(signal?.reason || Object.assign(new Error("Authority request aborted"), { name: "AbortError" }));
};
if (signal?.aborted) {
abortFromSignal();
} else if (signal) {
signal.addEventListener("abort", abortFromSignal, { once: true });
}
if (normalizedTimeoutMs > 0) {
timeoutId = setTimeout(() => {
timedOut = true;
controller.abort(Object.assign(new Error("Authority request timed out"), { name: "AbortError" }));
}, normalizedTimeoutMs);
}
return {
signal: controller.signal,
cleanup: () => {
if (timeoutId != null) clearTimeout(timeoutId);
if (signal) signal.removeEventListener("abort", abortFromSignal);
},
timedOut: () => timedOut,
};
}
async function readResponsePayload(response = null) {
if (!response) return {};
const contentType = String(response.headers?.get?.("content-type") || "").toLowerCase();
@@ -91,7 +166,7 @@ export class AuthorityHttpError extends Error {
this.name = "AuthorityHttpError";
this.status = Number(options.status || 0);
this.code = String(options.code || "");
this.category = String(options.category || "");
this.category = String(options.category || classifyAuthorityError(options));
this.payload = clonePlain(options.payload, null);
this.path = String(options.path || "");
this.protocol = String(options.protocol || "");
@@ -107,6 +182,7 @@ export class AuthorityHttpClient {
this.sessionToken = String(options.sessionToken || options.authoritySessionToken || "");
this.sessionInitConfig = buildDefaultSessionInitConfig(options.sessionInitConfig || options.initConfig || options);
this.sessionPromise = null;
this.timeoutMs = normalizeTimeoutMs(options.timeoutMs ?? options.authorityTimeoutMs, 0);
}
async buildHeaders({ session = false } = {}) {
@@ -154,6 +230,10 @@ export class AuthorityHttpClient {
}
async requestJson(path, options = {}) {
return await this._requestJson(path, options, { allowSessionRetry: true });
}
async _requestJson(path, options = {}, state = {}) {
if (typeof this.fetchImpl !== "function") {
throw new AuthorityHttpError("Authority fetch unavailable", {
path,
@@ -166,20 +246,54 @@ export class AuthorityHttpClient {
await this.ensureSession();
}
const headers = await this.buildHeaders({ session });
const response = await this.fetchImpl(`${this.baseUrl}${path}`, {
method,
headers,
...(method === "GET" || options.body === undefined ? {} : { body: JSON.stringify(options.body) }),
...(options.signal ? { signal: options.signal } : {}),
});
const requestSignal = createRequestSignal(options.signal, normalizeTimeoutMs(options.timeoutMs, this.timeoutMs));
let response = null;
let payload = {};
try {
response = await this.fetchImpl(`${this.baseUrl}${path}`, {
method,
headers,
...(method === "GET" || options.body === undefined ? {} : { body: JSON.stringify(options.body) }),
...(requestSignal.signal ? { signal: requestSignal.signal } : {}),
});
payload = await readResponsePayload(response);
} catch (error) {
const timedOut = requestSignal.timedOut();
const aborted = error?.name === "AbortError" && !timedOut;
throw new AuthorityHttpError(
timedOut
? `Authority request timed out after ${normalizeTimeoutMs(options.timeoutMs, this.timeoutMs)}ms`
: error?.message || String(error) || "Authority request failed",
{
status: 0,
code: timedOut ? "timeout" : aborted ? "aborted" : "network-error",
category: classifyAuthorityError({ error, timedOut, aborted }),
payload: null,
path,
protocol: options.protocol || this.protocol,
},
);
} finally {
requestSignal.cleanup();
}
const status = Number(response?.status || 0);
const payload = await readResponsePayload(response);
if (!response?.ok) {
const message = readPayloadErrorMessage(payload, `Authority HTTP ${status || "unknown"}`);
if (
session &&
state.allowSessionRetry !== false &&
options.retrySession !== false &&
isSessionRetryCandidate(status, payload)
) {
this.sessionToken = "";
this.sessionPromise = null;
await this.ensureSession();
return await this._requestJson(path, options, { allowSessionRetry: false });
}
throw new AuthorityHttpError(message || `Authority HTTP ${status || "unknown"}`, {
status,
code: payload?.code,
category: payload?.category,
category: classifyAuthorityError({ status, payload }),
payload,
path,
protocol: options.protocol || this.protocol,

View File

@@ -260,6 +260,18 @@ async function testAdapterBasics() {
assert.deepEqual(readResult.payload, { hello: "world" });
const deleteResult = await adapter.delete("user/files/demo.json");
assert.equal(deleteResult.deleted, true);
await assert.rejects(
() => adapter.writeJson("../secret.json", {}),
/Unsafe Authority Blob path/,
);
await assert.rejects(
() => adapter.readJson("user/files/%2e%2e/secret.json"),
/Unsafe Authority Blob path/,
);
await assert.rejects(
() => adapter.stat("C:/Users/demo.json"),
/Unsafe Authority Blob path/,
);
}
async function testAuthorityBlobFailOpenFallsBackToUserFiles() {

View File

@@ -0,0 +1,103 @@
import assert from "node:assert/strict";
import {
AUTHORITY_SESSION_HEADER,
AuthorityHttpClient,
AuthorityHttpError,
} from "../runtime/authority-http-client.js";
function jsonResponse(status, payload) {
return {
ok: status >= 200 && status < 300,
status,
headers: {
get(name) {
return String(name || "").toLowerCase() === "content-type" ? "application/json" : "";
},
},
async json() {
return payload;
},
};
}
{
const calls = [];
const client = new AuthorityHttpClient({
baseUrl: "https://authority.example.test/root",
fetchImpl: async (url, options = {}) => {
calls.push({ url, options });
if (url.endsWith("/session/init") && calls.filter((call) => call.url.endsWith("/session/init")).length === 1) {
return jsonResponse(200, { sessionToken: "old-session" });
}
if (url.endsWith("/session/init")) {
return jsonResponse(200, { sessionToken: "new-session" });
}
if (url.endsWith("/data") && options.headers?.[AUTHORITY_SESSION_HEADER] === "old-session") {
return jsonResponse(401, { code: "session-expired", message: "session expired" });
}
if (url.endsWith("/data") && options.headers?.[AUTHORITY_SESSION_HEADER] === "new-session") {
return jsonResponse(200, { ok: true, value: 42 });
}
return jsonResponse(500, { error: "unexpected" });
},
});
const result = await client.requestJson("/data", { session: true, body: { q: 1 } });
assert.deepEqual(result, { ok: true, value: 42 });
assert.deepEqual(
calls.map((call) => [call.url, call.options.headers?.[AUTHORITY_SESSION_HEADER] || ""]),
[
["https://authority.example.test/root/session/init", ""],
["https://authority.example.test/root/data", "old-session"],
["https://authority.example.test/root/session/init", ""],
["https://authority.example.test/root/data", "new-session"],
],
);
}
{
const calls = [];
const client = new AuthorityHttpClient({
baseUrl: "https://authority.example.test/root",
fetchImpl: async (url, options = {}) => {
calls.push({ url, options });
if (url.endsWith("/session/init")) {
return jsonResponse(200, { sessionToken: "permission-session" });
}
return jsonResponse(403, { code: "permission-denied", message: "permission denied" });
},
});
await assert.rejects(
() => client.requestJson("/private", { session: true, body: {} }),
(error) => {
assert.equal(error instanceof AuthorityHttpError, true);
assert.equal(error.status, 403);
assert.equal(error.category, "permission");
return true;
},
);
assert.equal(calls.filter((call) => call.url.endsWith("/session/init")).length, 1);
}
{
const client = new AuthorityHttpClient({
baseUrl: "https://authority.example.test/root",
timeoutMs: 5,
fetchImpl: async (_url, options = {}) => await new Promise((_resolve, reject) => {
options.signal?.addEventListener("abort", () => {
reject(Object.assign(new Error("aborted"), { name: "AbortError" }));
}, { once: true });
}),
});
await assert.rejects(
() => client.requestJson("/slow", { session: false }),
(error) => {
assert.equal(error instanceof AuthorityHttpError, true);
assert.equal(error.category, "timeout");
assert.equal(error.code, "timeout");
return true;
},
);
}
console.log("authority-http-client tests passed");

View File

@@ -135,6 +135,9 @@ assert.equal(submitted.idempotencyKey, idempotencyKey);
const completed = await adapter.waitForCompletion(submitted.id, { timeoutMs: 1000 });
assert.equal(completed.status, "completed");
assert.equal(completed.success, true);
assert.equal(completed.waitDiagnostics.mode, "poll");
assert.equal(completed.waitDiagnostics.pollCount, 1);
assert.equal(completed.waitDiagnostics.terminal, true);
const page = await adapter.listPage({ limit: 10 });
assert.equal(page.jobs.length, 1);
@@ -310,6 +313,36 @@ assert.equal(timedOutJob.status, "timeout");
assert.equal(timedOutJob.terminal, true);
assert.equal(timedOutJob.success, false);
let adapterTimeoutPolls = 0;
const timeoutAdapter = createAuthorityJobAdapter(
{
authorityBaseUrl: "/api/plugins/authority",
authorityJobPollIntervalMs: 1,
authorityJobPollMaxIntervalMs: 2,
authorityJobPollBackoffFactor: 2,
},
{
jobClient: {
async get(payload = {}) {
adapterTimeoutPolls += 1;
return {
job: {
id: payload.jobId,
status: "running",
progress: 0.4,
},
};
},
},
},
);
const adapterTimedOutJob = await timeoutAdapter.waitForCompletion("job-wait-timeout", { timeoutMs: 1 });
assert.equal(adapterTimedOutJob.status, "timeout");
assert.equal(adapterTimedOutJob.waitDiagnostics.mode, "poll");
assert.equal(adapterTimedOutJob.waitDiagnostics.pollCount >= 1, true);
assert.equal(adapterTimedOutJob.waitDiagnostics.lastStatus, "running");
assert.equal(adapterTimeoutPolls >= 1, true);
const streamingClient = {
async streamJob(payload) {
return (async function* () {

View File

@@ -72,7 +72,18 @@ function createMockTriviumClient({ failBulkUpsert = false } = {}) {
calls,
async purge(payload) {
calls.push(["purge", payload]);
return { ok: true };
return {
ok: true,
diagnostics: {
operation: "purge",
pageSize: payload.purgePageSize || 200,
maxPages: payload.purgeMaxPages || 1000,
pages: 1,
scanned: 0,
deleted: 0,
truncated: false,
},
};
},
async bulkUpsert(payload) {
calls.push(["bulkUpsert", payload]);
@@ -174,6 +185,13 @@ assert.equal(isAuthorityVectorConfig(config), true);
const linkCall = triviumClient.calls.find(([name]) => name === "linkMany");
assert.equal(linkCall?.[1]?.links?.[0]?.fromId, "node-a");
assert.equal(linkCall?.[1]?.links?.[0]?.toId, "node-b");
assert.equal(result.timings.authorityDiagnostics.purge.operation, "purge");
assert.equal(result.timings.authorityDiagnostics.upsert.operation, "bulkUpsert");
assert.equal(result.timings.authorityDiagnostics.upsert.chunks.length, 2);
assert.equal(result.timings.authorityDiagnostics.upsert.chunks.every((chunk) => chunk.ok), true);
assert.ok(result.timings.authorityDiagnostics.upsert.totalBytes > 0);
assert.equal(result.timings.authorityDiagnostics.link.operation, "linkMany");
assert.equal(result.timings.authorityDiagnostics.link.totalItems, 1);
}
{
@@ -230,11 +248,15 @@ assert.equal(isAuthorityVectorConfig(config), true);
archived: false,
ownerKeys: ["character:Alice"],
},
candidateIds: ["node-a"],
searchText: "Alice archive",
});
assert.deepEqual(filteredIds, ["node-a", "node-b"]);
const filterCall = triviumClient.calls.find(([name]) => name === "filterWhere");
assert.equal(filterCall?.[1]?.collectionId, "authority-filter");
assert.equal(filterCall?.[1]?.filters?.ownerKeys?.[0], "character:Alice");
assert.deepEqual(filterCall?.[1]?.candidateIds, ["node-a"]);
assert.equal(filterCall?.[1]?.searchText, "Alice archive");
}
{

View File

@@ -11,6 +11,8 @@ export const AUTHORITY_VECTOR_SOURCE = "authority-trivium";
const DEFAULT_AUTHORITY_TRIVIUM_DATABASE = "st_bme_vectors";
const DEFAULT_AUTHORITY_VECTOR_CHUNK_SIZE = 1000;
const MAX_AUTHORITY_VECTOR_CHUNK_SIZE = 2000;
const DEFAULT_AUTHORITY_PURGE_PAGE_SIZE = 200;
const DEFAULT_AUTHORITY_PURGE_MAX_PAGES = 1000;
const DEFAULT_AUTHORITY_EMBEDDING_BACKEND_SOURCE = "openai";
function clampInteger(value, fallback, min, max) {
@@ -23,6 +25,17 @@ function toArray(value) {
return Array.isArray(value) ? value : [];
}
function nowMs() {
if (typeof performance?.now === "function") {
return performance.now();
}
return Date.now();
}
function roundMs(value) {
return Math.round((Number(value) || 0) * 10) / 10;
}
function clonePlain(value, fallbackValue = null) {
if (value == null) return fallbackValue;
if (typeof globalThis.structuredClone === "function") {
@@ -56,6 +69,26 @@ function normalizePositiveInteger(value, fallback = 0) {
return Math.floor(parsed);
}
function estimateJsonBytes(value = null) {
try {
const text = JSON.stringify(value ?? null);
if (typeof TextEncoder === "function") {
return new TextEncoder().encode(text).length;
}
return text.length;
} catch {
return 0;
}
}
function isPlainObject(value = null) {
return Boolean(value && typeof value === "object" && !Array.isArray(value));
}
function hasPlainKeys(value = null) {
return isPlainObject(value) && Object.keys(value).length > 0;
}
function normalizeOpenAICompatibleBaseUrl(value) {
return String(value || "")
.trim()
@@ -329,6 +362,18 @@ export function normalizeAuthorityVectorConfig(settings = {}, overrides = {}) {
1,
MAX_AUTHORITY_VECTOR_CHUNK_SIZE,
),
purgePageSize: clampInteger(
source.authorityTriviumPurgePageSize ?? source.authorityVectorPurgePageSize,
DEFAULT_AUTHORITY_PURGE_PAGE_SIZE,
1,
1000,
),
purgeMaxPages: clampInteger(
source.authorityTriviumPurgeMaxPages ?? source.authorityVectorPurgeMaxPages,
DEFAULT_AUTHORITY_PURGE_MAX_PAGES,
1,
100000,
),
timeoutMs: Math.max(0, Number(source.timeoutMs || 0) || 0),
failOpen: source.authorityVectorFailOpen !== false && source.failOpen !== false,
...overrides,
@@ -347,6 +392,8 @@ export class AuthorityTriviumHttpClient {
dtype: String(options.dtype || "").trim(),
syncMode: String(options.syncMode || "").trim(),
storageMode: String(options.storageMode || "").trim(),
purgePageSize: clampInteger(options.purgePageSize, DEFAULT_AUTHORITY_PURGE_PAGE_SIZE, 1, 1000),
purgeMaxPages: clampInteger(options.purgeMaxPages, DEFAULT_AUTHORITY_PURGE_MAX_PAGES, 1, 100000),
};
this.http = new AuthorityHttpClient({
...options,
@@ -386,15 +433,34 @@ export class AuthorityTriviumHttpClient {
async purge(payload = {}) {
const namespace = getNamespace(payload);
const openOptions = this.buildOpenOptions(payload);
const pageSize = clampInteger(
payload.pageSize ?? payload.limit ?? payload.purgePageSize ?? this.config.purgePageSize,
DEFAULT_AUTHORITY_PURGE_PAGE_SIZE,
1,
1000,
);
const maxPages = clampInteger(
payload.maxPages ?? payload.purgeMaxPages ?? this.config.purgeMaxPages,
DEFAULT_AUTHORITY_PURGE_MAX_PAGES,
1,
100000,
);
const startedAt = nowMs();
let cursor = "";
let deleted = 0;
let scanned = 0;
for (let pageIndex = 0; pageIndex < 100; pageIndex++) {
let pages = 0;
let truncated = false;
for (let pageIndex = 0; pageIndex < maxPages; pageIndex++) {
const page = await this.requestV06("/trivium/list-mappings", {
...openOptions,
namespace,
page: { cursor, limit: 200 },
page: {
...(cursor ? { cursor } : {}),
limit: pageSize,
},
});
pages += 1;
const mappings = toArray(page?.mappings);
if (!mappings.length && !page?.page?.hasMore) break;
scanned += mappings.length;
@@ -411,8 +477,28 @@ export class AuthorityTriviumHttpClient {
if (!page?.page?.hasMore) break;
cursor = String(page?.page?.nextCursor || "");
if (!cursor) break;
if (pageIndex === maxPages - 1) truncated = true;
}
return { ok: true, scanned, deleted };
return {
ok: !truncated,
scanned,
deleted,
pages,
truncated,
nextCursor: truncated ? cursor : "",
diagnostics: {
operation: "purge",
namespace,
pageSize,
maxPages,
pages,
scanned,
deleted,
truncated,
nextCursor: truncated ? cursor : "",
totalMs: roundMs(nowMs() - startedAt),
},
};
}
async bulkUpsert(payload = {}) {
@@ -506,12 +592,21 @@ export class AuthorityTriviumHttpClient {
async filterWhere(payload = {}) {
const namespace = getNamespace(payload);
const filters = payload.filters || payload.filter || payload.where || null;
const payloadFilter = payload.payloadFilter || filters;
const candidateIds = toArray(payload.candidateIds).map(normalizeRecordId).filter(Boolean);
const query = String(payload.query || payload.searchText || "").trim();
const result = await this.requestV06("/trivium/list-mappings", {
...this.buildOpenOptions(payload),
namespace,
page: {
...(payload.cursor ? { cursor: String(payload.cursor) } : {}),
limit: Number(payload.limit || payload.topK || payload.pageSize || 100) || 100,
},
...(hasPlainKeys(filters) ? { filters, where: filters } : {}),
...(hasPlainKeys(payloadFilter) ? { payloadFilter } : {}),
...(candidateIds.length ? { candidateIds } : {}),
...(query ? { query, searchText: query } : {}),
});
return { items: toArray(result?.mappings) };
}
@@ -598,21 +693,44 @@ export async function purgeAuthorityTriviumNamespace(config = {}, options = {})
namespace: options.namespace,
collectionId: options.collectionId,
chatId: options.chatId,
purgePageSize: options.purgePageSize,
purgeMaxPages: options.purgeMaxPages,
});
}
export async function deleteAuthorityTriviumNodes(config = {}, nodeIds = [], options = {}) {
const ids = toArray(nodeIds).map(normalizeRecordId).filter(Boolean);
if (!ids.length) return { deleted: 0 };
if (!ids.length) {
return {
deleted: 0,
diagnostics: {
operation: "deleteMany",
requested: 0,
deleted: 0,
totalMs: 0,
},
};
}
throwIfAborted(options.signal);
const client = createAuthorityTriviumClient(config, options);
return await callClient(client, ["deleteMany", "deleteNodes"], "deleteMany", {
const startedAt = nowMs();
const result = await callClient(client, ["deleteMany", "deleteNodes"], "deleteMany", {
namespace: options.namespace,
collectionId: options.collectionId,
chatId: options.chatId,
ids,
externalIds: ids,
});
return {
...result,
deleted: Number(result?.deleted ?? result?.successCount ?? ids.length) || 0,
diagnostics: {
operation: "deleteMany",
requested: ids.length,
deleted: Number(result?.deleted ?? result?.successCount ?? ids.length) || 0,
totalMs: roundMs(nowMs() - startedAt),
},
};
}
export async function filterAuthorityTriviumNodes(config = {}, options = {}) {
@@ -642,37 +760,122 @@ export async function filterAuthorityTriviumNodes(config = {}, options = {}) {
export async function upsertAuthorityTriviumEntries(graph, config = {}, entries = [], options = {}) {
const items = buildAuthorityVectorItems(graph, entries, options);
if (!items.length) return { upserted: 0 };
if (!items.length) {
return {
upserted: 0,
diagnostics: {
operation: "bulkUpsert",
totalItems: 0,
chunkSize: 0,
chunks: [],
totalBytes: 0,
totalMs: 0,
},
};
}
throwIfAborted(options.signal);
const client = createAuthorityTriviumClient(config, options);
const chunkSize = clampInteger(config.chunkSize, DEFAULT_AUTHORITY_VECTOR_CHUNK_SIZE, 1, MAX_AUTHORITY_VECTOR_CHUNK_SIZE);
let upserted = 0;
let totalBytes = 0;
const chunks = [];
const startedAt = nowMs();
for (let index = 0; index < items.length; index += chunkSize) {
throwIfAborted(options.signal);
const chunk = items.slice(index, index + chunkSize);
await callClient(client, ["bulkUpsert", "upsertMany", "upsert"], "bulkUpsert", {
namespace: options.namespace,
collectionId: options.collectionId,
chatId: options.chatId,
items: chunk,
});
upserted += chunk.length;
const chunkStartedAt = nowMs();
const estimatedBytes = estimateJsonBytes(chunk);
totalBytes += estimatedBytes;
try {
const result = await callClient(client, ["bulkUpsert", "upsertMany", "upsert"], "bulkUpsert", {
namespace: options.namespace,
collectionId: options.collectionId,
chatId: options.chatId,
items: chunk,
});
const successCount = Number(result?.successCount ?? result?.upserted ?? chunk.length) || chunk.length;
upserted += successCount;
chunks.push({
index: chunks.length,
offset: index,
itemCount: chunk.length,
upserted: successCount,
vectorDim: normalizeVector(chunk[0]?.vector || chunk[0]?.embedding).length,
estimatedBytes,
durationMs: roundMs(nowMs() - chunkStartedAt),
ok: true,
});
} catch (error) {
chunks.push({
index: chunks.length,
offset: index,
itemCount: chunk.length,
upserted: 0,
vectorDim: normalizeVector(chunk[0]?.vector || chunk[0]?.embedding).length,
estimatedBytes,
durationMs: roundMs(nowMs() - chunkStartedAt),
ok: false,
error: error?.message || String(error),
});
error.authorityDiagnostics = {
operation: "bulkUpsert",
totalItems: items.length,
chunkSize,
chunks,
totalBytes,
totalMs: roundMs(nowMs() - startedAt),
};
throw error;
}
}
return { upserted };
return {
upserted,
diagnostics: {
operation: "bulkUpsert",
totalItems: items.length,
chunkSize,
chunks,
totalBytes,
totalMs: roundMs(nowMs() - startedAt),
},
};
}
export async function syncAuthorityTriviumLinks(graph, config = {}, options = {}) {
const links = buildAuthorityLinkItems(graph, options);
if (!links.length) return { linked: 0 };
if (!links.length) {
return {
linked: 0,
diagnostics: {
operation: "linkMany",
totalItems: 0,
estimatedBytes: 0,
totalMs: 0,
},
};
}
throwIfAborted(options.signal);
const client = createAuthorityTriviumClient(config, options);
await callClient(client, ["linkMany", "upsertLinks"], "linkMany", {
const startedAt = nowMs();
const estimatedBytes = estimateJsonBytes(links);
const result = await callClient(client, ["linkMany", "upsertLinks"], "linkMany", {
namespace: options.namespace,
collectionId: options.collectionId,
chatId: options.chatId,
links,
});
return { linked: links.length };
const linked = Number(result?.linked ?? result?.successCount ?? links.length) || links.length;
return {
...result,
linked,
diagnostics: {
operation: "linkMany",
totalItems: links.length,
linked,
estimatedBytes,
totalMs: roundMs(nowMs() - startedAt),
},
};
}
export async function queryAuthorityTriviumNeighbors(config = {}, nodeIds = [], options = {}) {

View File

@@ -764,6 +764,10 @@ export async function syncGraphVectorIndex(
let authorityDeleteMs = 0;
let authorityUpsertMs = 0;
let authorityLinkMs = 0;
let authorityPurgeDiagnostics = null;
let authorityDeleteDiagnostics = null;
let authorityUpsertDiagnostics = null;
let authorityLinkDiagnostics = null;
let embedBatchMs = 0;
let deletedHashCount = 0;
let deletedNodeCount = 0;
@@ -801,17 +805,22 @@ export async function syncGraphVectorIndex(
throw new Error(`Authority Trivium embedding failed for ${embeddingResult.failures} item(s)`);
}
const purgeStartedAt = nowMs();
await purgeAuthorityTriviumNamespace(config, authorityOptions);
const purgeResult = await purgeAuthorityTriviumNamespace(config, authorityOptions);
authorityPurgeMs += nowMs() - purgeStartedAt;
authorityPurgeDiagnostics = purgeResult?.diagnostics || null;
if (purgeResult?.truncated) {
throw new Error(`Authority Trivium purge truncated after ${purgeResult.pages || 0} page(s)`);
}
resetVectorMappings(graph, config, effectiveChatId);
const upsertStartedAt = nowMs();
await upsertAuthorityTriviumEntries(
const upsertResult = await upsertAuthorityTriviumEntries(
graph,
config,
desiredEntries,
authorityOptions,
);
authorityUpsertMs += nowMs() - upsertStartedAt;
authorityUpsertDiagnostics = upsertResult?.diagnostics || null;
for (const entry of desiredEntries) {
state.hashToNodeId[entry.hash] = entry.nodeId;
state.nodeToHash[entry.nodeId] = entry.hash;
@@ -861,16 +870,18 @@ export async function syncGraphVectorIndex(
}
deletedNodeCount = nodeIdsToDelete.length;
const deleteStartedAt = nowMs();
await deleteAuthorityTriviumNodes(config, nodeIdsToDelete, authorityOptions);
const deleteResult = await deleteAuthorityTriviumNodes(config, nodeIdsToDelete, authorityOptions);
authorityDeleteMs += nowMs() - deleteStartedAt;
authorityDeleteDiagnostics = deleteResult?.diagnostics || null;
const upsertStartedAt = nowMs();
await upsertAuthorityTriviumEntries(
const upsertResult = await upsertAuthorityTriviumEntries(
graph,
config,
entriesToUpsert,
authorityOptions,
);
authorityUpsertMs += nowMs() - upsertStartedAt;
authorityUpsertDiagnostics = upsertResult?.diagnostics || null;
for (const entry of entriesToUpsert) {
state.hashToNodeId[entry.hash] = entry.nodeId;
@@ -880,8 +891,9 @@ export async function syncGraphVectorIndex(
}
const linkStartedAt = nowMs();
await syncAuthorityTriviumLinks(graph, config, authorityOptions);
const linkResult = await syncAuthorityTriviumLinks(graph, config, authorityOptions);
authorityLinkMs += nowMs() - linkStartedAt;
authorityLinkDiagnostics = linkResult?.diagnostics || null;
for (const node of graph.nodes || []) {
if (Array.isArray(node.embedding) && node.embedding.length > 0) {
@@ -914,6 +926,12 @@ export async function syncGraphVectorIndex(
authorityDeleteMs: roundMs(authorityDeleteMs),
authorityUpsertMs: roundMs(authorityUpsertMs),
authorityLinkMs: roundMs(authorityLinkMs),
authorityDiagnostics: {
purge: authorityPurgeDiagnostics,
delete: authorityDeleteDiagnostics,
upsert: error?.authorityDiagnostics || authorityUpsertDiagnostics,
link: authorityLinkDiagnostics,
},
totalMs: roundMs(nowMs() - syncStartedAt),
updatedAt: Date.now(),
};
@@ -1106,6 +1124,12 @@ export async function syncGraphVectorIndex(
authorityDeleteMs: roundMs(authorityDeleteMs),
authorityUpsertMs: roundMs(authorityUpsertMs),
authorityLinkMs: roundMs(authorityLinkMs),
authorityDiagnostics: {
purge: authorityPurgeDiagnostics,
delete: authorityDeleteDiagnostics,
upsert: authorityUpsertDiagnostics,
link: authorityLinkDiagnostics,
},
embedBatchMs: roundMs(embedBatchMs),
statsBuildMs: roundMs(statsBuildMs),
deletedHashes: Math.max(0, Math.floor(deletedHashCount)),