diff --git a/maintenance/authority-blob-adapter.js b/maintenance/authority-blob-adapter.js index 2a34f90..2f67190 100644 --- a/maintenance/authority-blob-adapter.js +++ b/maintenance/authority-blob-adapter.js @@ -61,6 +61,31 @@ export function normalizeAuthorityBlobPath(path = "") { return normalized.replace(/\/+$/g, ""); } +function decodePathForValidation(path = "") { + try { + return decodeURIComponent(String(path || "")); + } catch { + return String(path || ""); + } +} + +function assertSafeAuthorityBlobPath(path = "", options = {}) { + const normalized = normalizeAuthorityBlobPath(path); + if (!normalized) { + if (options.allowEmpty) return ""; + throw new Error("Authority Blob path is required"); + } + const decoded = decodePathForValidation(normalized).replace(/\\/g, "/"); + if (/^[A-Za-z]:(?:\/|$)/.test(decoded) || decoded.includes(":/")) { + throw new Error(`Unsafe Authority Blob path: ${normalized}`); + } + const segments = decoded.split("/").filter(Boolean); + if (segments.some((segment) => segment === "." || segment === "..")) { + throw new Error(`Unsafe Authority Blob path: ${normalized}`); + } + return normalized; +} + function normalizeBlobPayload(result = null) { if (!result || typeof result !== "object" || Array.isArray(result)) return result; const source = result.file || result.blob || result.result || result; @@ -197,8 +222,9 @@ export class AuthorityBlobHttpClient { } async writeText(payload = {}) { + const path = assertSafeAuthorityBlobPath(payload.path || payload.name); return await this.request(`${AUTHORITY_BLOB_ENDPOINT}/write-file`, { - path: normalizeAuthorityBlobPath(payload.path || payload.name), + path, content: String(payload.text ?? payload.data ?? payload.content ?? ""), encoding: "utf8", createParents: true, @@ -206,22 +232,25 @@ export class AuthorityBlobHttpClient { } async readJson(payload = {}) { + const path = assertSafeAuthorityBlobPath(payload.path || payload.name); return await this.request(`${AUTHORITY_BLOB_ENDPOINT}/read-file`, { - path: normalizeAuthorityBlobPath(payload.path || payload.name), + path, encoding: "utf8", }, { signal: payload.signal }); } async delete(payload = {}) { + const path = assertSafeAuthorityBlobPath(payload.path || payload.name); return await this.request(`${AUTHORITY_BLOB_ENDPOINT}/delete`, { - path: normalizeAuthorityBlobPath(payload.path || payload.name), + path, recursive: false, }, { signal: payload.signal }); } async stat(payload = {}) { + const path = assertSafeAuthorityBlobPath(payload.path || payload.name); return await this.request(`${AUTHORITY_BLOB_ENDPOINT}/stat`, { - path: normalizeAuthorityBlobPath(payload.path || payload.name), + path, }, { signal: payload.signal }); } } @@ -271,8 +300,7 @@ export class AuthorityBlobAdapter { async writeJson(path, payload = null, options = {}) { throwIfAborted(options.signal); - const normalizedPath = normalizeAuthorityBlobPath(path); - if (!normalizedPath) throw new Error("Authority Blob path is required"); + const normalizedPath = assertSafeAuthorityBlobPath(path); const result = await callClient(this.client, ["writeJson", "putJson", "writeFile", "put"], "writeJson", { namespace: options.namespace || this.config.namespace, path: normalizedPath, @@ -287,8 +315,7 @@ export class AuthorityBlobAdapter { async writeText(path, text = "", options = {}) { throwIfAborted(options.signal); - const normalizedPath = normalizeAuthorityBlobPath(path); - if (!normalizedPath) throw new Error("Authority Blob path is required"); + const normalizedPath = assertSafeAuthorityBlobPath(path); const result = await callClient(this.client, ["writeText", "writeFile", "putText", "put"], "writeText", { namespace: options.namespace || this.config.namespace, path: normalizedPath, @@ -303,7 +330,7 @@ export class AuthorityBlobAdapter { async readJson(path, options = {}) { throwIfAborted(options.signal); - const normalizedPath = normalizeAuthorityBlobPath(path); + const normalizedPath = assertSafeAuthorityBlobPath(path, { allowEmpty: true }); if (!normalizedPath) return normalizeAuthorityBlobReadResult({ exists: false }, ""); try { const result = await callClient(this.client, ["readJson", "getJson", "readFile", "get"], "readJson", { @@ -322,7 +349,7 @@ export class AuthorityBlobAdapter { async delete(path, options = {}) { throwIfAborted(options.signal); - const normalizedPath = normalizeAuthorityBlobPath(path); + const normalizedPath = assertSafeAuthorityBlobPath(path, { allowEmpty: true }); if (!normalizedPath) return normalizeAuthorityBlobDeleteResult({ exists: false }, ""); try { const result = await callClient(this.client, ["delete", "deleteFile", "remove", "unlink"], "delete", { @@ -341,7 +368,7 @@ export class AuthorityBlobAdapter { async stat(path, options = {}) { throwIfAborted(options.signal); - const normalizedPath = normalizeAuthorityBlobPath(path); + const normalizedPath = assertSafeAuthorityBlobPath(path, { allowEmpty: true }); if (!normalizedPath) return normalizeAuthorityBlobReadResult({ exists: false }, ""); try { const result = await callClient(this.client, ["stat", "head", "metadata"], "stat", { diff --git a/maintenance/authority-job-adapter.js b/maintenance/authority-job-adapter.js index 3823bb2..91839ea 100644 --- a/maintenance/authority-job-adapter.js +++ b/maintenance/authority-job-adapter.js @@ -245,6 +245,8 @@ export function normalizeAuthorityJobConfig(settings = {}, overrides = {}) { failOpen: source.authorityFailOpen !== false && source.failOpen !== false, preferStream: source.authorityJobPreferStream !== false && source.jobStreamPreferred !== false, pollIntervalMs: normalizeInteger(source.authorityJobPollIntervalMs ?? source.pollIntervalMs, 1200, 250, 30000), + pollMaxIntervalMs: normalizeInteger(source.authorityJobPollMaxIntervalMs ?? source.pollMaxIntervalMs, 5000, 250, 60000), + pollBackoffFactor: Math.min(5, Math.max(1, Number(source.authorityJobPollBackoffFactor ?? source.pollBackoffFactor ?? 1.25) || 1.25)), waitTimeoutMs: normalizeInteger(source.authorityJobWaitTimeoutMs ?? source.waitTimeoutMs, 0, 0, 3600000), ...overrides, }; @@ -439,20 +441,75 @@ export class AuthorityJobAdapter { id, timeoutMs: normalizeInteger(options.timeoutMs, this.config.waitTimeoutMs, 0, 3600000), }); - return normalizeAuthorityJobRecord(result?.job || result?.result || result); + const normalized = normalizeAuthorityJobRecord(result?.job || result?.result || result); + return { + ...normalized, + waitDiagnostics: { + mode: "client", + pollCount: 0, + elapsedMs: 0, + timeoutMs: normalizeInteger(options.timeoutMs, this.config.waitTimeoutMs, 0, 3600000), + terminal: normalized.terminal, + }, + }; } const startedAt = Date.now(); const timeoutMs = normalizeInteger(options.timeoutMs, this.config.waitTimeoutMs, 0, 3600000); - const pollIntervalMs = normalizeInteger(options.pollIntervalMs, this.config.pollIntervalMs, 250, 30000); + const initialPollIntervalMs = normalizeInteger(options.pollIntervalMs, this.config.pollIntervalMs, 250, 30000); + const maxPollIntervalMs = Math.max( + initialPollIntervalMs, + normalizeInteger(options.pollMaxIntervalMs, this.config.pollMaxIntervalMs, 250, 60000), + ); + const backoffFactor = Math.min(5, Math.max(1, Number(options.pollBackoffFactor ?? this.config.pollBackoffFactor) || 1)); + let pollIntervalMs = initialPollIntervalMs; + let pollCount = 0; + let lastJob = normalizeAuthorityJobRecord(null); while (true) { throwIfAborted(options.signal); const job = await this.get(id, options); - if (job.terminal) return job; - if (timeoutMs > 0 && Date.now() - startedAt >= timeoutMs) { - return { ...job, status: "timeout", terminal: true, success: false, error: "wait timeout" }; + pollCount += 1; + lastJob = job; + const elapsedMs = Date.now() - startedAt; + if (job.terminal) { + return { + ...job, + waitDiagnostics: { + mode: "poll", + pollCount, + elapsedMs, + timeoutMs, + pollIntervalMs: initialPollIntervalMs, + maxPollIntervalMs, + backoffFactor, + terminal: true, + }, + }; } - await sleep(pollIntervalMs, options.signal); + if (timeoutMs > 0 && elapsedMs >= timeoutMs) { + return { + ...job, + status: "timeout", + terminal: true, + success: false, + error: "wait timeout", + waitDiagnostics: { + mode: "poll", + pollCount, + elapsedMs, + timeoutMs, + pollIntervalMs: initialPollIntervalMs, + maxPollIntervalMs, + backoffFactor, + terminal: false, + lastStatus: lastJob.status, + lastProgress: lastJob.progress, + }, + }; + } + const remainingMs = timeoutMs > 0 ? Math.max(0, timeoutMs - elapsedMs) : pollIntervalMs; + await sleep(timeoutMs > 0 ? Math.min(pollIntervalMs, remainingMs) : pollIntervalMs, options.signal); + pollIntervalMs = Math.min(maxPollIntervalMs, Math.max(initialPollIntervalMs, Math.ceil(pollIntervalMs * backoffFactor))); } } diff --git a/runtime/authority-http-client.js b/runtime/authority-http-client.js index 4ac5aff..cd246ef 100644 --- a/runtime/authority-http-client.js +++ b/runtime/authority-http-client.js @@ -31,6 +31,15 @@ function normalizeHeaderName(name = "") { return String(name || "").trim().toLowerCase(); } +function normalizeTimeoutMs(value, fallbackValue = 0) { + const parsed = Number(value); + if (!Number.isFinite(parsed) || parsed <= 0) { + const fallback = Number(fallbackValue); + return Number.isFinite(fallback) && fallback > 0 ? Math.floor(fallback) : 0; + } + return Math.floor(parsed); +} + function hasSessionHeader(headers = {}) { return Object.keys(headers || {}).some((name) => normalizeHeaderName(name) === AUTHORITY_SESSION_HEADER); } @@ -59,6 +68,72 @@ function readPayloadErrorMessage(payload = null, fallback = "") { return String(payload.error || payload.message || payload.reason || fallback || ""); } +function readPayloadCode(payload = null) { + if (!payload || typeof payload !== "object" || Array.isArray(payload)) return ""; + return String(payload.code || payload.reason || payload.category || payload.errorCode || "").trim().toLowerCase(); +} + +function isSessionRetryCandidate(status = 0, payload = null) { + const numericStatus = Number(status || 0); + if (numericStatus === 401) return true; + if (numericStatus !== 403) return false; + const code = readPayloadCode(payload); + const message = readPayloadErrorMessage(payload, "").toLowerCase(); + return /session|token/.test(`${code} ${message}`) && /invalid|expired|missing|unauthorized/.test(`${code} ${message}`); +} + +function classifyAuthorityError({ status = 0, payload = null, error = null, timedOut = false, aborted = false } = {}) { + const numericStatus = Number(status || 0); + const payloadCategory = String(payload?.category || "").trim(); + if (payloadCategory) return payloadCategory; + if (timedOut || numericStatus === 408) return "timeout"; + if (aborted) return "aborted"; + if (isSessionRetryCandidate(numericStatus, payload)) return "session"; + if (numericStatus === 403) return "permission"; + if (numericStatus === 404) return "not-found"; + if (numericStatus === 413) return "payload-too-large"; + if (numericStatus === 429) return "rate-limit"; + if (numericStatus >= 500) return "server"; + if (numericStatus >= 400) return "validation"; + if (error) return "network"; + return ""; +} + +function createRequestSignal(signal = undefined, timeoutMs = 0) { + const normalizedTimeoutMs = normalizeTimeoutMs(timeoutMs, 0); + if (!signal && normalizedTimeoutMs <= 0) { + return { signal: undefined, cleanup: () => {}, timedOut: () => false }; + } + if (typeof AbortController !== "function") { + return { signal, cleanup: () => {}, timedOut: () => false }; + } + const controller = new AbortController(); + let timeoutId = null; + let timedOut = false; + const abortFromSignal = () => { + controller.abort(signal?.reason || Object.assign(new Error("Authority request aborted"), { name: "AbortError" })); + }; + if (signal?.aborted) { + abortFromSignal(); + } else if (signal) { + signal.addEventListener("abort", abortFromSignal, { once: true }); + } + if (normalizedTimeoutMs > 0) { + timeoutId = setTimeout(() => { + timedOut = true; + controller.abort(Object.assign(new Error("Authority request timed out"), { name: "AbortError" })); + }, normalizedTimeoutMs); + } + return { + signal: controller.signal, + cleanup: () => { + if (timeoutId != null) clearTimeout(timeoutId); + if (signal) signal.removeEventListener("abort", abortFromSignal); + }, + timedOut: () => timedOut, + }; +} + async function readResponsePayload(response = null) { if (!response) return {}; const contentType = String(response.headers?.get?.("content-type") || "").toLowerCase(); @@ -91,7 +166,7 @@ export class AuthorityHttpError extends Error { this.name = "AuthorityHttpError"; this.status = Number(options.status || 0); this.code = String(options.code || ""); - this.category = String(options.category || ""); + this.category = String(options.category || classifyAuthorityError(options)); this.payload = clonePlain(options.payload, null); this.path = String(options.path || ""); this.protocol = String(options.protocol || ""); @@ -107,6 +182,7 @@ export class AuthorityHttpClient { this.sessionToken = String(options.sessionToken || options.authoritySessionToken || ""); this.sessionInitConfig = buildDefaultSessionInitConfig(options.sessionInitConfig || options.initConfig || options); this.sessionPromise = null; + this.timeoutMs = normalizeTimeoutMs(options.timeoutMs ?? options.authorityTimeoutMs, 0); } async buildHeaders({ session = false } = {}) { @@ -154,6 +230,10 @@ export class AuthorityHttpClient { } async requestJson(path, options = {}) { + return await this._requestJson(path, options, { allowSessionRetry: true }); + } + + async _requestJson(path, options = {}, state = {}) { if (typeof this.fetchImpl !== "function") { throw new AuthorityHttpError("Authority fetch unavailable", { path, @@ -166,20 +246,54 @@ export class AuthorityHttpClient { await this.ensureSession(); } const headers = await this.buildHeaders({ session }); - const response = await this.fetchImpl(`${this.baseUrl}${path}`, { - method, - headers, - ...(method === "GET" || options.body === undefined ? {} : { body: JSON.stringify(options.body) }), - ...(options.signal ? { signal: options.signal } : {}), - }); + const requestSignal = createRequestSignal(options.signal, normalizeTimeoutMs(options.timeoutMs, this.timeoutMs)); + let response = null; + let payload = {}; + try { + response = await this.fetchImpl(`${this.baseUrl}${path}`, { + method, + headers, + ...(method === "GET" || options.body === undefined ? {} : { body: JSON.stringify(options.body) }), + ...(requestSignal.signal ? { signal: requestSignal.signal } : {}), + }); + payload = await readResponsePayload(response); + } catch (error) { + const timedOut = requestSignal.timedOut(); + const aborted = error?.name === "AbortError" && !timedOut; + throw new AuthorityHttpError( + timedOut + ? `Authority request timed out after ${normalizeTimeoutMs(options.timeoutMs, this.timeoutMs)}ms` + : error?.message || String(error) || "Authority request failed", + { + status: 0, + code: timedOut ? "timeout" : aborted ? "aborted" : "network-error", + category: classifyAuthorityError({ error, timedOut, aborted }), + payload: null, + path, + protocol: options.protocol || this.protocol, + }, + ); + } finally { + requestSignal.cleanup(); + } const status = Number(response?.status || 0); - const payload = await readResponsePayload(response); if (!response?.ok) { const message = readPayloadErrorMessage(payload, `Authority HTTP ${status || "unknown"}`); + if ( + session && + state.allowSessionRetry !== false && + options.retrySession !== false && + isSessionRetryCandidate(status, payload) + ) { + this.sessionToken = ""; + this.sessionPromise = null; + await this.ensureSession(); + return await this._requestJson(path, options, { allowSessionRetry: false }); + } throw new AuthorityHttpError(message || `Authority HTTP ${status || "unknown"}`, { status, code: payload?.code, - category: payload?.category, + category: classifyAuthorityError({ status, payload }), payload, path, protocol: options.protocol || this.protocol, diff --git a/tests/authority-blob.mjs b/tests/authority-blob.mjs index 0f736fe..11d92db 100644 --- a/tests/authority-blob.mjs +++ b/tests/authority-blob.mjs @@ -260,6 +260,18 @@ async function testAdapterBasics() { assert.deepEqual(readResult.payload, { hello: "world" }); const deleteResult = await adapter.delete("user/files/demo.json"); assert.equal(deleteResult.deleted, true); + await assert.rejects( + () => adapter.writeJson("../secret.json", {}), + /Unsafe Authority Blob path/, + ); + await assert.rejects( + () => adapter.readJson("user/files/%2e%2e/secret.json"), + /Unsafe Authority Blob path/, + ); + await assert.rejects( + () => adapter.stat("C:/Users/demo.json"), + /Unsafe Authority Blob path/, + ); } async function testAuthorityBlobFailOpenFallsBackToUserFiles() { diff --git a/tests/authority-http-client.mjs b/tests/authority-http-client.mjs new file mode 100644 index 0000000..ecdf43f --- /dev/null +++ b/tests/authority-http-client.mjs @@ -0,0 +1,103 @@ +import assert from "node:assert/strict"; + +import { + AUTHORITY_SESSION_HEADER, + AuthorityHttpClient, + AuthorityHttpError, +} from "../runtime/authority-http-client.js"; + +function jsonResponse(status, payload) { + return { + ok: status >= 200 && status < 300, + status, + headers: { + get(name) { + return String(name || "").toLowerCase() === "content-type" ? "application/json" : ""; + }, + }, + async json() { + return payload; + }, + }; +} + +{ + const calls = []; + const client = new AuthorityHttpClient({ + baseUrl: "https://authority.example.test/root", + fetchImpl: async (url, options = {}) => { + calls.push({ url, options }); + if (url.endsWith("/session/init") && calls.filter((call) => call.url.endsWith("/session/init")).length === 1) { + return jsonResponse(200, { sessionToken: "old-session" }); + } + if (url.endsWith("/session/init")) { + return jsonResponse(200, { sessionToken: "new-session" }); + } + if (url.endsWith("/data") && options.headers?.[AUTHORITY_SESSION_HEADER] === "old-session") { + return jsonResponse(401, { code: "session-expired", message: "session expired" }); + } + if (url.endsWith("/data") && options.headers?.[AUTHORITY_SESSION_HEADER] === "new-session") { + return jsonResponse(200, { ok: true, value: 42 }); + } + return jsonResponse(500, { error: "unexpected" }); + }, + }); + const result = await client.requestJson("/data", { session: true, body: { q: 1 } }); + assert.deepEqual(result, { ok: true, value: 42 }); + assert.deepEqual( + calls.map((call) => [call.url, call.options.headers?.[AUTHORITY_SESSION_HEADER] || ""]), + [ + ["https://authority.example.test/root/session/init", ""], + ["https://authority.example.test/root/data", "old-session"], + ["https://authority.example.test/root/session/init", ""], + ["https://authority.example.test/root/data", "new-session"], + ], + ); +} + +{ + const calls = []; + const client = new AuthorityHttpClient({ + baseUrl: "https://authority.example.test/root", + fetchImpl: async (url, options = {}) => { + calls.push({ url, options }); + if (url.endsWith("/session/init")) { + return jsonResponse(200, { sessionToken: "permission-session" }); + } + return jsonResponse(403, { code: "permission-denied", message: "permission denied" }); + }, + }); + await assert.rejects( + () => client.requestJson("/private", { session: true, body: {} }), + (error) => { + assert.equal(error instanceof AuthorityHttpError, true); + assert.equal(error.status, 403); + assert.equal(error.category, "permission"); + return true; + }, + ); + assert.equal(calls.filter((call) => call.url.endsWith("/session/init")).length, 1); +} + +{ + const client = new AuthorityHttpClient({ + baseUrl: "https://authority.example.test/root", + timeoutMs: 5, + fetchImpl: async (_url, options = {}) => await new Promise((_resolve, reject) => { + options.signal?.addEventListener("abort", () => { + reject(Object.assign(new Error("aborted"), { name: "AbortError" })); + }, { once: true }); + }), + }); + await assert.rejects( + () => client.requestJson("/slow", { session: false }), + (error) => { + assert.equal(error instanceof AuthorityHttpError, true); + assert.equal(error.category, "timeout"); + assert.equal(error.code, "timeout"); + return true; + }, + ); +} + +console.log("authority-http-client tests passed"); diff --git a/tests/authority-jobs.mjs b/tests/authority-jobs.mjs index b0d1372..562996d 100644 --- a/tests/authority-jobs.mjs +++ b/tests/authority-jobs.mjs @@ -135,6 +135,9 @@ assert.equal(submitted.idempotencyKey, idempotencyKey); const completed = await adapter.waitForCompletion(submitted.id, { timeoutMs: 1000 }); assert.equal(completed.status, "completed"); assert.equal(completed.success, true); +assert.equal(completed.waitDiagnostics.mode, "poll"); +assert.equal(completed.waitDiagnostics.pollCount, 1); +assert.equal(completed.waitDiagnostics.terminal, true); const page = await adapter.listPage({ limit: 10 }); assert.equal(page.jobs.length, 1); @@ -310,6 +313,36 @@ assert.equal(timedOutJob.status, "timeout"); assert.equal(timedOutJob.terminal, true); assert.equal(timedOutJob.success, false); +let adapterTimeoutPolls = 0; +const timeoutAdapter = createAuthorityJobAdapter( + { + authorityBaseUrl: "/api/plugins/authority", + authorityJobPollIntervalMs: 1, + authorityJobPollMaxIntervalMs: 2, + authorityJobPollBackoffFactor: 2, + }, + { + jobClient: { + async get(payload = {}) { + adapterTimeoutPolls += 1; + return { + job: { + id: payload.jobId, + status: "running", + progress: 0.4, + }, + }; + }, + }, + }, +); +const adapterTimedOutJob = await timeoutAdapter.waitForCompletion("job-wait-timeout", { timeoutMs: 1 }); +assert.equal(adapterTimedOutJob.status, "timeout"); +assert.equal(adapterTimedOutJob.waitDiagnostics.mode, "poll"); +assert.equal(adapterTimedOutJob.waitDiagnostics.pollCount >= 1, true); +assert.equal(adapterTimedOutJob.waitDiagnostics.lastStatus, "running"); +assert.equal(adapterTimeoutPolls >= 1, true); + const streamingClient = { async streamJob(payload) { return (async function* () { diff --git a/tests/authority-vector-primary.mjs b/tests/authority-vector-primary.mjs index c5dbd42..9cfd235 100644 --- a/tests/authority-vector-primary.mjs +++ b/tests/authority-vector-primary.mjs @@ -72,7 +72,18 @@ function createMockTriviumClient({ failBulkUpsert = false } = {}) { calls, async purge(payload) { calls.push(["purge", payload]); - return { ok: true }; + return { + ok: true, + diagnostics: { + operation: "purge", + pageSize: payload.purgePageSize || 200, + maxPages: payload.purgeMaxPages || 1000, + pages: 1, + scanned: 0, + deleted: 0, + truncated: false, + }, + }; }, async bulkUpsert(payload) { calls.push(["bulkUpsert", payload]); @@ -174,6 +185,13 @@ assert.equal(isAuthorityVectorConfig(config), true); const linkCall = triviumClient.calls.find(([name]) => name === "linkMany"); assert.equal(linkCall?.[1]?.links?.[0]?.fromId, "node-a"); assert.equal(linkCall?.[1]?.links?.[0]?.toId, "node-b"); + assert.equal(result.timings.authorityDiagnostics.purge.operation, "purge"); + assert.equal(result.timings.authorityDiagnostics.upsert.operation, "bulkUpsert"); + assert.equal(result.timings.authorityDiagnostics.upsert.chunks.length, 2); + assert.equal(result.timings.authorityDiagnostics.upsert.chunks.every((chunk) => chunk.ok), true); + assert.ok(result.timings.authorityDiagnostics.upsert.totalBytes > 0); + assert.equal(result.timings.authorityDiagnostics.link.operation, "linkMany"); + assert.equal(result.timings.authorityDiagnostics.link.totalItems, 1); } { @@ -230,11 +248,15 @@ assert.equal(isAuthorityVectorConfig(config), true); archived: false, ownerKeys: ["character:Alice"], }, + candidateIds: ["node-a"], + searchText: "Alice archive", }); assert.deepEqual(filteredIds, ["node-a", "node-b"]); const filterCall = triviumClient.calls.find(([name]) => name === "filterWhere"); assert.equal(filterCall?.[1]?.collectionId, "authority-filter"); assert.equal(filterCall?.[1]?.filters?.ownerKeys?.[0], "character:Alice"); + assert.deepEqual(filterCall?.[1]?.candidateIds, ["node-a"]); + assert.equal(filterCall?.[1]?.searchText, "Alice archive"); } { diff --git a/vector/authority-vector-primary-adapter.js b/vector/authority-vector-primary-adapter.js index f8ded38..5ebc518 100644 --- a/vector/authority-vector-primary-adapter.js +++ b/vector/authority-vector-primary-adapter.js @@ -11,6 +11,8 @@ export const AUTHORITY_VECTOR_SOURCE = "authority-trivium"; const DEFAULT_AUTHORITY_TRIVIUM_DATABASE = "st_bme_vectors"; const DEFAULT_AUTHORITY_VECTOR_CHUNK_SIZE = 1000; const MAX_AUTHORITY_VECTOR_CHUNK_SIZE = 2000; +const DEFAULT_AUTHORITY_PURGE_PAGE_SIZE = 200; +const DEFAULT_AUTHORITY_PURGE_MAX_PAGES = 1000; const DEFAULT_AUTHORITY_EMBEDDING_BACKEND_SOURCE = "openai"; function clampInteger(value, fallback, min, max) { @@ -23,6 +25,17 @@ function toArray(value) { return Array.isArray(value) ? value : []; } +function nowMs() { + if (typeof performance?.now === "function") { + return performance.now(); + } + return Date.now(); +} + +function roundMs(value) { + return Math.round((Number(value) || 0) * 10) / 10; +} + function clonePlain(value, fallbackValue = null) { if (value == null) return fallbackValue; if (typeof globalThis.structuredClone === "function") { @@ -56,6 +69,26 @@ function normalizePositiveInteger(value, fallback = 0) { return Math.floor(parsed); } +function estimateJsonBytes(value = null) { + try { + const text = JSON.stringify(value ?? null); + if (typeof TextEncoder === "function") { + return new TextEncoder().encode(text).length; + } + return text.length; + } catch { + return 0; + } +} + +function isPlainObject(value = null) { + return Boolean(value && typeof value === "object" && !Array.isArray(value)); +} + +function hasPlainKeys(value = null) { + return isPlainObject(value) && Object.keys(value).length > 0; +} + function normalizeOpenAICompatibleBaseUrl(value) { return String(value || "") .trim() @@ -329,6 +362,18 @@ export function normalizeAuthorityVectorConfig(settings = {}, overrides = {}) { 1, MAX_AUTHORITY_VECTOR_CHUNK_SIZE, ), + purgePageSize: clampInteger( + source.authorityTriviumPurgePageSize ?? source.authorityVectorPurgePageSize, + DEFAULT_AUTHORITY_PURGE_PAGE_SIZE, + 1, + 1000, + ), + purgeMaxPages: clampInteger( + source.authorityTriviumPurgeMaxPages ?? source.authorityVectorPurgeMaxPages, + DEFAULT_AUTHORITY_PURGE_MAX_PAGES, + 1, + 100000, + ), timeoutMs: Math.max(0, Number(source.timeoutMs || 0) || 0), failOpen: source.authorityVectorFailOpen !== false && source.failOpen !== false, ...overrides, @@ -347,6 +392,8 @@ export class AuthorityTriviumHttpClient { dtype: String(options.dtype || "").trim(), syncMode: String(options.syncMode || "").trim(), storageMode: String(options.storageMode || "").trim(), + purgePageSize: clampInteger(options.purgePageSize, DEFAULT_AUTHORITY_PURGE_PAGE_SIZE, 1, 1000), + purgeMaxPages: clampInteger(options.purgeMaxPages, DEFAULT_AUTHORITY_PURGE_MAX_PAGES, 1, 100000), }; this.http = new AuthorityHttpClient({ ...options, @@ -386,15 +433,34 @@ export class AuthorityTriviumHttpClient { async purge(payload = {}) { const namespace = getNamespace(payload); const openOptions = this.buildOpenOptions(payload); + const pageSize = clampInteger( + payload.pageSize ?? payload.limit ?? payload.purgePageSize ?? this.config.purgePageSize, + DEFAULT_AUTHORITY_PURGE_PAGE_SIZE, + 1, + 1000, + ); + const maxPages = clampInteger( + payload.maxPages ?? payload.purgeMaxPages ?? this.config.purgeMaxPages, + DEFAULT_AUTHORITY_PURGE_MAX_PAGES, + 1, + 100000, + ); + const startedAt = nowMs(); let cursor = ""; let deleted = 0; let scanned = 0; - for (let pageIndex = 0; pageIndex < 100; pageIndex++) { + let pages = 0; + let truncated = false; + for (let pageIndex = 0; pageIndex < maxPages; pageIndex++) { const page = await this.requestV06("/trivium/list-mappings", { ...openOptions, namespace, - page: { cursor, limit: 200 }, + page: { + ...(cursor ? { cursor } : {}), + limit: pageSize, + }, }); + pages += 1; const mappings = toArray(page?.mappings); if (!mappings.length && !page?.page?.hasMore) break; scanned += mappings.length; @@ -411,8 +477,28 @@ export class AuthorityTriviumHttpClient { if (!page?.page?.hasMore) break; cursor = String(page?.page?.nextCursor || ""); if (!cursor) break; + if (pageIndex === maxPages - 1) truncated = true; } - return { ok: true, scanned, deleted }; + return { + ok: !truncated, + scanned, + deleted, + pages, + truncated, + nextCursor: truncated ? cursor : "", + diagnostics: { + operation: "purge", + namespace, + pageSize, + maxPages, + pages, + scanned, + deleted, + truncated, + nextCursor: truncated ? cursor : "", + totalMs: roundMs(nowMs() - startedAt), + }, + }; } async bulkUpsert(payload = {}) { @@ -506,12 +592,21 @@ export class AuthorityTriviumHttpClient { async filterWhere(payload = {}) { const namespace = getNamespace(payload); + const filters = payload.filters || payload.filter || payload.where || null; + const payloadFilter = payload.payloadFilter || filters; + const candidateIds = toArray(payload.candidateIds).map(normalizeRecordId).filter(Boolean); + const query = String(payload.query || payload.searchText || "").trim(); const result = await this.requestV06("/trivium/list-mappings", { ...this.buildOpenOptions(payload), namespace, page: { + ...(payload.cursor ? { cursor: String(payload.cursor) } : {}), limit: Number(payload.limit || payload.topK || payload.pageSize || 100) || 100, }, + ...(hasPlainKeys(filters) ? { filters, where: filters } : {}), + ...(hasPlainKeys(payloadFilter) ? { payloadFilter } : {}), + ...(candidateIds.length ? { candidateIds } : {}), + ...(query ? { query, searchText: query } : {}), }); return { items: toArray(result?.mappings) }; } @@ -598,21 +693,44 @@ export async function purgeAuthorityTriviumNamespace(config = {}, options = {}) namespace: options.namespace, collectionId: options.collectionId, chatId: options.chatId, + purgePageSize: options.purgePageSize, + purgeMaxPages: options.purgeMaxPages, }); } export async function deleteAuthorityTriviumNodes(config = {}, nodeIds = [], options = {}) { const ids = toArray(nodeIds).map(normalizeRecordId).filter(Boolean); - if (!ids.length) return { deleted: 0 }; + if (!ids.length) { + return { + deleted: 0, + diagnostics: { + operation: "deleteMany", + requested: 0, + deleted: 0, + totalMs: 0, + }, + }; + } throwIfAborted(options.signal); const client = createAuthorityTriviumClient(config, options); - return await callClient(client, ["deleteMany", "deleteNodes"], "deleteMany", { + const startedAt = nowMs(); + const result = await callClient(client, ["deleteMany", "deleteNodes"], "deleteMany", { namespace: options.namespace, collectionId: options.collectionId, chatId: options.chatId, ids, externalIds: ids, }); + return { + ...result, + deleted: Number(result?.deleted ?? result?.successCount ?? ids.length) || 0, + diagnostics: { + operation: "deleteMany", + requested: ids.length, + deleted: Number(result?.deleted ?? result?.successCount ?? ids.length) || 0, + totalMs: roundMs(nowMs() - startedAt), + }, + }; } export async function filterAuthorityTriviumNodes(config = {}, options = {}) { @@ -642,37 +760,122 @@ export async function filterAuthorityTriviumNodes(config = {}, options = {}) { export async function upsertAuthorityTriviumEntries(graph, config = {}, entries = [], options = {}) { const items = buildAuthorityVectorItems(graph, entries, options); - if (!items.length) return { upserted: 0 }; + if (!items.length) { + return { + upserted: 0, + diagnostics: { + operation: "bulkUpsert", + totalItems: 0, + chunkSize: 0, + chunks: [], + totalBytes: 0, + totalMs: 0, + }, + }; + } throwIfAborted(options.signal); const client = createAuthorityTriviumClient(config, options); const chunkSize = clampInteger(config.chunkSize, DEFAULT_AUTHORITY_VECTOR_CHUNK_SIZE, 1, MAX_AUTHORITY_VECTOR_CHUNK_SIZE); let upserted = 0; + let totalBytes = 0; + const chunks = []; + const startedAt = nowMs(); for (let index = 0; index < items.length; index += chunkSize) { throwIfAborted(options.signal); const chunk = items.slice(index, index + chunkSize); - await callClient(client, ["bulkUpsert", "upsertMany", "upsert"], "bulkUpsert", { - namespace: options.namespace, - collectionId: options.collectionId, - chatId: options.chatId, - items: chunk, - }); - upserted += chunk.length; + const chunkStartedAt = nowMs(); + const estimatedBytes = estimateJsonBytes(chunk); + totalBytes += estimatedBytes; + try { + const result = await callClient(client, ["bulkUpsert", "upsertMany", "upsert"], "bulkUpsert", { + namespace: options.namespace, + collectionId: options.collectionId, + chatId: options.chatId, + items: chunk, + }); + const successCount = Number(result?.successCount ?? result?.upserted ?? chunk.length) || chunk.length; + upserted += successCount; + chunks.push({ + index: chunks.length, + offset: index, + itemCount: chunk.length, + upserted: successCount, + vectorDim: normalizeVector(chunk[0]?.vector || chunk[0]?.embedding).length, + estimatedBytes, + durationMs: roundMs(nowMs() - chunkStartedAt), + ok: true, + }); + } catch (error) { + chunks.push({ + index: chunks.length, + offset: index, + itemCount: chunk.length, + upserted: 0, + vectorDim: normalizeVector(chunk[0]?.vector || chunk[0]?.embedding).length, + estimatedBytes, + durationMs: roundMs(nowMs() - chunkStartedAt), + ok: false, + error: error?.message || String(error), + }); + error.authorityDiagnostics = { + operation: "bulkUpsert", + totalItems: items.length, + chunkSize, + chunks, + totalBytes, + totalMs: roundMs(nowMs() - startedAt), + }; + throw error; + } } - return { upserted }; + return { + upserted, + diagnostics: { + operation: "bulkUpsert", + totalItems: items.length, + chunkSize, + chunks, + totalBytes, + totalMs: roundMs(nowMs() - startedAt), + }, + }; } export async function syncAuthorityTriviumLinks(graph, config = {}, options = {}) { const links = buildAuthorityLinkItems(graph, options); - if (!links.length) return { linked: 0 }; + if (!links.length) { + return { + linked: 0, + diagnostics: { + operation: "linkMany", + totalItems: 0, + estimatedBytes: 0, + totalMs: 0, + }, + }; + } throwIfAborted(options.signal); const client = createAuthorityTriviumClient(config, options); - await callClient(client, ["linkMany", "upsertLinks"], "linkMany", { + const startedAt = nowMs(); + const estimatedBytes = estimateJsonBytes(links); + const result = await callClient(client, ["linkMany", "upsertLinks"], "linkMany", { namespace: options.namespace, collectionId: options.collectionId, chatId: options.chatId, links, }); - return { linked: links.length }; + const linked = Number(result?.linked ?? result?.successCount ?? links.length) || links.length; + return { + ...result, + linked, + diagnostics: { + operation: "linkMany", + totalItems: links.length, + linked, + estimatedBytes, + totalMs: roundMs(nowMs() - startedAt), + }, + }; } export async function queryAuthorityTriviumNeighbors(config = {}, nodeIds = [], options = {}) { diff --git a/vector/vector-index.js b/vector/vector-index.js index fe1dc10..bbccd90 100644 --- a/vector/vector-index.js +++ b/vector/vector-index.js @@ -764,6 +764,10 @@ export async function syncGraphVectorIndex( let authorityDeleteMs = 0; let authorityUpsertMs = 0; let authorityLinkMs = 0; + let authorityPurgeDiagnostics = null; + let authorityDeleteDiagnostics = null; + let authorityUpsertDiagnostics = null; + let authorityLinkDiagnostics = null; let embedBatchMs = 0; let deletedHashCount = 0; let deletedNodeCount = 0; @@ -801,17 +805,22 @@ export async function syncGraphVectorIndex( throw new Error(`Authority Trivium embedding failed for ${embeddingResult.failures} item(s)`); } const purgeStartedAt = nowMs(); - await purgeAuthorityTriviumNamespace(config, authorityOptions); + const purgeResult = await purgeAuthorityTriviumNamespace(config, authorityOptions); authorityPurgeMs += nowMs() - purgeStartedAt; + authorityPurgeDiagnostics = purgeResult?.diagnostics || null; + if (purgeResult?.truncated) { + throw new Error(`Authority Trivium purge truncated after ${purgeResult.pages || 0} page(s)`); + } resetVectorMappings(graph, config, effectiveChatId); const upsertStartedAt = nowMs(); - await upsertAuthorityTriviumEntries( + const upsertResult = await upsertAuthorityTriviumEntries( graph, config, desiredEntries, authorityOptions, ); authorityUpsertMs += nowMs() - upsertStartedAt; + authorityUpsertDiagnostics = upsertResult?.diagnostics || null; for (const entry of desiredEntries) { state.hashToNodeId[entry.hash] = entry.nodeId; state.nodeToHash[entry.nodeId] = entry.hash; @@ -861,16 +870,18 @@ export async function syncGraphVectorIndex( } deletedNodeCount = nodeIdsToDelete.length; const deleteStartedAt = nowMs(); - await deleteAuthorityTriviumNodes(config, nodeIdsToDelete, authorityOptions); + const deleteResult = await deleteAuthorityTriviumNodes(config, nodeIdsToDelete, authorityOptions); authorityDeleteMs += nowMs() - deleteStartedAt; + authorityDeleteDiagnostics = deleteResult?.diagnostics || null; const upsertStartedAt = nowMs(); - await upsertAuthorityTriviumEntries( + const upsertResult = await upsertAuthorityTriviumEntries( graph, config, entriesToUpsert, authorityOptions, ); authorityUpsertMs += nowMs() - upsertStartedAt; + authorityUpsertDiagnostics = upsertResult?.diagnostics || null; for (const entry of entriesToUpsert) { state.hashToNodeId[entry.hash] = entry.nodeId; @@ -880,8 +891,9 @@ export async function syncGraphVectorIndex( } const linkStartedAt = nowMs(); - await syncAuthorityTriviumLinks(graph, config, authorityOptions); + const linkResult = await syncAuthorityTriviumLinks(graph, config, authorityOptions); authorityLinkMs += nowMs() - linkStartedAt; + authorityLinkDiagnostics = linkResult?.diagnostics || null; for (const node of graph.nodes || []) { if (Array.isArray(node.embedding) && node.embedding.length > 0) { @@ -914,6 +926,12 @@ export async function syncGraphVectorIndex( authorityDeleteMs: roundMs(authorityDeleteMs), authorityUpsertMs: roundMs(authorityUpsertMs), authorityLinkMs: roundMs(authorityLinkMs), + authorityDiagnostics: { + purge: authorityPurgeDiagnostics, + delete: authorityDeleteDiagnostics, + upsert: error?.authorityDiagnostics || authorityUpsertDiagnostics, + link: authorityLinkDiagnostics, + }, totalMs: roundMs(nowMs() - syncStartedAt), updatedAt: Date.now(), }; @@ -1106,6 +1124,12 @@ export async function syncGraphVectorIndex( authorityDeleteMs: roundMs(authorityDeleteMs), authorityUpsertMs: roundMs(authorityUpsertMs), authorityLinkMs: roundMs(authorityLinkMs), + authorityDiagnostics: { + purge: authorityPurgeDiagnostics, + delete: authorityDeleteDiagnostics, + upsert: authorityUpsertDiagnostics, + link: authorityLinkDiagnostics, + }, embedBatchMs: roundMs(embedBatchMs), statsBuildMs: roundMs(statsBuildMs), deletedHashes: Math.max(0, Math.floor(deletedHashCount)),