Merge dev into main

This commit is contained in:
opencode
2026-05-15 09:59:09 +00:00
16 changed files with 1388 additions and 174 deletions

View File

@@ -239,6 +239,7 @@ export function onChatChangedController(runtime) {
runtime.setPendingHistoryRecoveryTimer(null);
runtime.setPendingHistoryRecoveryTrigger("");
runtime.clearPendingAutoExtraction?.();
runtime.clearPendingBackgroundVectorSync?.();
runtime.clearPendingGraphLoadRetry();
runtime.setSkipBeforeCombineRecallUntil(0);
runtime.setLastPreGenerationRecallKey("");

134
index.js
View File

@@ -257,6 +257,7 @@ import {
writePersistedRecallToUserMessage,
} from "./retrieval/recall-persistence.js";
import { resolveConfiguredTimeoutMs } from "./runtime/request-timeout.js";
import { createVectorSyncCoalescer as createImportedVectorSyncCoalescer } from "./runtime/vector-sync-coalescer.js";
import {
defaultSettings,
getPersistedSettingsSnapshot,
@@ -1308,6 +1309,45 @@ const backgroundMaintenanceQueue =
typeof createBackgroundMaintenanceQueue === "function"
? createBackgroundMaintenanceQueue()
: null;
const backgroundVectorSyncCoalescer =
typeof createImportedVectorSyncCoalescer === "function"
? createImportedVectorSyncCoalescer()
: {
clear() {},
getActive() {
return null;
},
getPending() {
return null;
},
enqueue(task = {}) {
return {
scheduled: true,
coalesced: false,
task: {
...(task || {}),
stale: false,
},
};
},
start(task = null) {
return Boolean(task && !task.stale);
},
complete() {
return true;
},
drop(task = null) {
if (task) task.stale = true;
return Boolean(task);
},
isStale(task = null, chatId = "") {
return Boolean(
!task ||
task.stale ||
(chatId && task.chatId && String(chatId) !== String(task.chatId)),
);
},
};
const lastStatusToastAt = {};
let pendingRecallSendIntent = createRecallInputRecord();
let lastRecallSentUserMessage = createRecallInputRecord();
@@ -6967,7 +7007,6 @@ function shouldUseAuthorityGraphStore(settings = getSettings(), capability = aut
normalizedSettings.sqlPrimary &&
normalizedSettings.storageMode !== "local-primary" &&
normalizedSettings.storageMode !== "off" &&
normalizedCapability.serverPrimaryReady &&
normalizedCapability.storagePrimaryReady
);
}
@@ -14394,7 +14433,7 @@ function buildBatchPersistenceRecordFromPersistResult(persistResult = null) {
if (
accepted &&
["indexeddb", "opfs", "luker-chat-state"].includes(
["indexeddb", "opfs", "authority-sql", "luker-chat-state"].includes(
String(persistResult?.storageTier || ""),
)
) {
@@ -14449,6 +14488,7 @@ async function persistGraphToConfiguredDurableTier(
if (
persistenceEnvironment.hostProfile === "luker" &&
persistenceEnvironment.primaryStorageTier === "luker-chat-state" &&
canUseHostGraphChatStatePersistence(context)
) {
const chatStateResult = await persistGraphToHostChatState(context, {
@@ -16777,50 +16817,81 @@ async function syncVectorState({
function scheduleBackgroundVectorSync(task = null, settings = {}) {
const normalizedTask =
task && typeof task === "object" && !Array.isArray(task) ? task : {};
const range =
normalizedTask.range &&
Number.isFinite(Number(normalizedTask.range.start)) &&
Number.isFinite(Number(normalizedTask.range.end))
? {
start: Math.floor(Number(normalizedTask.range.start)),
end: Math.floor(Number(normalizedTask.range.end)),
}
: null;
const reason =
String(normalizedTask.reason || "background-vector-sync").trim() ||
"background-vector-sync";
const config = getEmbeddingConfig();
const chatId = normalizeChatIdCandidate(
normalizedTask.chatId || getCurrentChatId() || graphPersistenceState.chatId,
);
const mode =
String(
normalizedTask.mode ||
resolveMaintenancePostProcessConcurrency(settings).mode ||
"balanced",
).trim() || "balanced";
return enqueueBackgroundMaintenanceTask(
const coalesced = backgroundVectorSyncCoalescer.enqueue({
...normalizedTask,
chatId,
modelScope: getVectorModelScope(config),
mode,
reason:
String(normalizedTask.reason || "background-vector-sync").trim() ||
"background-vector-sync",
});
const scheduledTask = coalesced.task;
if (!coalesced.scheduled) {
return {
queued: true,
coalesced: true,
id: scheduledTask.id,
snapshot: updateBackgroundMaintenanceQueueState(
typeof backgroundMaintenanceQueue?.getSnapshot === "function"
? backgroundMaintenanceQueue.getSnapshot()
: null,
),
};
}
const queuedResult = enqueueBackgroundMaintenanceTask(
"vector-sync",
async () => {
setLastVectorStatus(
"后台向量同步中",
`${mode} 模式 · 正在同步提取后的向量索引`,
"running",
{ syncRuntime: false },
);
const result = await syncVectorState({ range });
if (result?.aborted) {
throw createAbortError(result.error || "后台向量同步已终止");
backgroundVectorSyncCoalescer.start(scheduledTask);
try {
const activeChatId = normalizeChatIdCandidate(getCurrentChatId());
if (backgroundVectorSyncCoalescer.isStale(scheduledTask, activeChatId)) {
return { skipped: true, reason: "stale-background-vector-sync" };
}
setLastVectorStatus(
"后台向量同步中",
`${scheduledTask.mode} 模式 · 正在同步提取后的向量索引`,
"running",
{ syncRuntime: false },
);
const result = await syncVectorState({ range: scheduledTask.range });
if (result?.aborted) {
throw createAbortError(result.error || "后台向量同步已终止");
}
if (result?.error) {
throw new Error(result.error);
}
saveGraphToChat({ reason: scheduledTask.reason });
return result;
} finally {
backgroundVectorSyncCoalescer.complete(scheduledTask);
}
if (result?.error) {
throw new Error(result.error);
}
saveGraphToChat({ reason });
return result;
},
settings,
{
id: String(normalizedTask.id || ""),
id: scheduledTask.id,
},
);
if (queuedResult?.queued !== true) {
backgroundVectorSyncCoalescer.drop?.(
scheduledTask,
queuedResult?.reason || "background-vector-sync-queue-rejected",
);
}
return queuedResult;
}
function hasPlanCommitChanges(planCommit = null) {
if (!planCommit || typeof planCommit !== "object") return false;
return [
@@ -22349,6 +22420,7 @@ function onChatChanged() {
clearGenerationRecallTransactionsForChat,
clearInjectionState,
clearPendingAutoExtraction,
clearPendingBackgroundVectorSync: () => backgroundVectorSyncCoalescer.clear("chat-changed"),
clearPendingGraphLoadRetry,
clearPendingHistoryMutationChecks,
clearCurrentGenerationTrivialSkip,

View File

@@ -6,6 +6,6 @@
"js": "index.js",
"css": "style.css",
"author": "Youzini",
"version": "6.3.0",
"version": "6.3.3",
"homePage": "https://github.com/Youzini-afk/ST-Bionic-Memory-Ecology"
}

View File

@@ -162,6 +162,26 @@ function readPayloadMessage(payload = {}, fallback = "") {
return String(payload.error || payload.message || payload.reason || fallback || "");
}
function classifyAuthorityProbeStatus(status = 0, payload = null) {
const payloadCategory = String(payload?.category || "").trim();
if (payloadCategory) return payloadCategory;
const numericStatus = Number(status || 0);
if (numericStatus === 408) return "timeout";
if (numericStatus === 401 || numericStatus === 403) return "permission";
if (numericStatus === 413) return "payload-too-large";
if (numericStatus === 429) return "rate-limit";
if (numericStatus >= 500) return "server";
if (numericStatus >= 400) return "validation";
return "";
}
function classifyAuthorityProbeError(error = null) {
const category = String(error?.category || error?.errorCategory || "").trim();
if (category) return category;
if (String(error?.name || "") === "AbortError") return "timeout";
return error ? "network" : "";
}
function buildAuthorityPermissionEvaluateRequests(settings = {}, readiness = {}, options = {}) {
const requests = [];
const sqlTarget = String(options.sqlTarget || settings.sqlTarget || "default");
@@ -198,6 +218,8 @@ async function verifyAuthorityDataPlane(baseUrl, fetchImpl, headers, settings =
reason: initStatus === 401 || initStatus === 403 ? "session-init-denied" : "session-init-failed",
lastError: readPayloadMessage(initPayload, `HTTP ${initStatus || "unknown"}`),
status: initStatus,
errorCategory: classifyAuthorityProbeStatus(initStatus, initPayload),
errorDomain: "authority",
};
}
@@ -231,6 +253,8 @@ async function verifyAuthorityDataPlane(baseUrl, fetchImpl, headers, settings =
reason: currentStatus === 401 || currentStatus === 403 ? "session-invalid" : "session-current-failed",
lastError: readPayloadMessage(currentPayload, `HTTP ${currentStatus || "unknown"}`),
status: currentStatus,
errorCategory: classifyAuthorityProbeStatus(currentStatus, currentPayload),
errorDomain: "authority",
};
}
@@ -259,6 +283,8 @@ async function verifyAuthorityDataPlane(baseUrl, fetchImpl, headers, settings =
reason: permissionStatus === 401 || permissionStatus === 403 ? "permission-denied" : "permission-evaluate-failed",
lastError: readPayloadMessage(permissionPayload, `HTTP ${permissionStatus || "unknown"}`),
status: permissionStatus,
errorCategory: classifyAuthorityProbeStatus(permissionStatus, permissionPayload),
errorDomain: "authority",
};
}
@@ -408,6 +434,8 @@ export function createDefaultAuthorityCapabilityState(overrides = {}) {
missingFeatures: ["sql.query", "sql.mutation", "trivium.search", "jobs", "blob-or-private-files"],
reason: "not-probed",
lastError: "",
errorCategory: "",
errorDomain: "",
endpoint: "",
status: 0,
latencyMs: 0,
@@ -459,6 +487,8 @@ export function normalizeAuthorityCapabilityState(input = {}, settings = {}) {
missingFeatures,
reason: String(source.reason || (healthy ? "ok" : "not-ready")),
lastError: String(source.lastError || ""),
errorCategory: String(source.errorCategory || ""),
errorDomain: String(source.errorDomain || ""),
endpoint: String(source.endpoint || ""),
status: clampInteger(source.status, 0, 0, 999),
latencyMs: Math.max(0, Number(source.latencyMs) || 0),
@@ -547,6 +577,7 @@ export async function probeAuthorityCapabilities(options = {}) {
let lastError = "";
let lastStatus = 0;
let lastErrorCategory = "";
for (const endpoint of buildAuthorityProbeUrls(settings.baseUrl)) {
const startedAt = readNowMs();
try {
@@ -555,6 +586,7 @@ export async function probeAuthorityCapabilities(options = {}) {
const status = Number(response?.status || 0);
lastStatus = status;
if (status === 404) continue;
const errorPayload = response?.ok ? null : await readResponsePayload(response);
if (status === 401 || status === 403) {
return normalizeAuthorityCapabilityState(
{
@@ -563,7 +595,9 @@ export async function probeAuthorityCapabilities(options = {}) {
sessionReady: false,
permissionReady: false,
reason: "permission-denied",
lastError: `HTTP ${status}`,
lastError: readPayloadMessage(errorPayload, `HTTP ${status}`),
errorCategory: classifyAuthorityProbeStatus(status, errorPayload),
errorDomain: "authority",
endpoint,
status,
latencyMs: normalizeLatencyMs(startedAt, finishedAt),
@@ -579,7 +613,9 @@ export async function probeAuthorityCapabilities(options = {}) {
installed: status > 0,
healthy: false,
reason: "http-error",
lastError: `HTTP ${status || "unknown"}`,
lastError: readPayloadMessage(errorPayload, `HTTP ${status || "unknown"}`),
errorCategory: classifyAuthorityProbeStatus(status, errorPayload),
errorDomain: "authority",
endpoint,
status,
latencyMs: normalizeLatencyMs(startedAt, finishedAt),
@@ -605,12 +641,16 @@ export async function probeAuthorityCapabilities(options = {}) {
let reason = missingFeatures.length ? "missing-required-features" : "ok";
let dataPlaneLastError = "";
let dataPlaneStatus = status;
let dataPlaneErrorCategory = "";
let dataPlaneErrorDomain = "";
if (healthy) {
const verified = await verifyAuthorityDataPlane(settings.baseUrl, fetchImpl, headers, settings, readiness, options);
sessionReady = verified.sessionReady;
permissionReady = verified.permissionReady;
dataPlaneStatus = Number(verified.status || status || 0);
dataPlaneLastError = String(verified.lastError || "");
dataPlaneErrorCategory = String(verified.errorCategory || "");
dataPlaneErrorDomain = String(verified.errorDomain || "");
if (verified.reason && verified.reason !== "ok") {
reason = verified.reason;
}
@@ -627,6 +667,8 @@ export async function probeAuthorityCapabilities(options = {}) {
missingFeatures,
reason,
lastError: dataPlaneLastError,
errorCategory: dataPlaneErrorCategory,
errorDomain: dataPlaneErrorDomain,
endpoint,
status: dataPlaneStatus,
latencyMs: normalizeLatencyMs(startedAt, finishedAt),
@@ -637,6 +679,8 @@ export async function probeAuthorityCapabilities(options = {}) {
);
} catch (error) {
lastError = error?.message || String(error);
lastStatus = Number(error?.status || lastStatus || 0);
lastErrorCategory = classifyAuthorityProbeError(error);
}
}
@@ -646,6 +690,8 @@ export async function probeAuthorityCapabilities(options = {}) {
healthy: false,
reason: lastStatus === 404 ? "not-installed" : "probe-failed",
lastError,
errorCategory: lastErrorCategory || classifyAuthorityProbeStatus(lastStatus),
errorDomain: lastErrorCategory || lastStatus ? "authority" : "",
status: lastStatus,
lastProbeAt: nowMs,
updatedAt: new Date(nowMs).toISOString(),

View File

@@ -0,0 +1,133 @@
export function normalizeVectorSyncRange(range = null) {
if (
range &&
Number.isFinite(Number(range.start)) &&
Number.isFinite(Number(range.end))
) {
const start = Math.floor(Number(range.start));
const end = Math.floor(Number(range.end));
return {
start: Math.min(start, end),
end: Math.max(start, end),
};
}
return null;
}
export function mergeVectorSyncRange(current = null, next = null) {
const currentRange = normalizeVectorSyncRange(current);
const nextRange = normalizeVectorSyncRange(next);
if (!currentRange || !nextRange) return null;
return {
start: Math.min(currentRange.start, nextRange.start),
end: Math.max(currentRange.end, nextRange.end),
};
}
function createTaskRecord(task = {}) {
const id = String(task.id || `vector-sync:${Date.now()}`);
return {
id,
chatId: String(task.chatId || "").trim(),
modelScope: String(task.modelScope || "").trim(),
range: normalizeVectorSyncRange(task.range),
reason:
String(task.reason || "background-vector-sync").trim() ||
"background-vector-sync",
mode: String(task.mode || "balanced").trim() || "balanced",
stale: false,
requestedAt: Date.now(),
updatedAt: Date.now(),
};
}
function canMergeTask(left = null, right = null) {
return Boolean(
left &&
right &&
!left.stale &&
left.chatId === right.chatId &&
left.modelScope === right.modelScope,
);
}
function mergeTaskInto(target, incoming) {
target.range = mergeVectorSyncRange(target.range, incoming.range);
target.reason =
target.reason === incoming.reason
? target.reason
: `${target.reason}+${incoming.reason}`;
target.mode = incoming.mode || target.mode;
target.updatedAt = Date.now();
return target;
}
function markStale(task = null, reason = "stale") {
if (!task) return;
task.stale = true;
task.clearReason = String(reason || "stale");
}
export function createVectorSyncCoalescer() {
let active = null;
let pending = null;
return {
clear(reason = "clear") {
markStale(active, reason);
markStale(pending, reason);
active = null;
pending = null;
},
getActive() {
return active;
},
getPending() {
return pending;
},
enqueue(task = {}) {
const incoming = createTaskRecord(task);
if (canMergeTask(active, incoming)) {
if (canMergeTask(pending, incoming)) {
mergeTaskInto(pending, incoming);
return { scheduled: false, coalesced: true, task: pending };
}
markStale(pending, "replaced");
pending = incoming;
return { scheduled: true, coalesced: false, task: pending };
}
if (canMergeTask(pending, incoming)) {
mergeTaskInto(pending, incoming);
return { scheduled: false, coalesced: true, task: pending };
}
markStale(pending, "replaced");
pending = incoming;
return { scheduled: true, coalesced: false, task: pending };
},
start(task = null) {
if (!task || task.stale) return false;
if (pending === task) pending = null;
active = task;
return true;
},
complete(task = null) {
if (task && active !== task) return false;
active = null;
return true;
},
drop(task = null, reason = "dropped") {
if (!task) return false;
const target = pending === task ? pending : active === task ? active : null;
if (!target) return false;
markStale(target, reason);
if (pending === task) pending = null;
if (active === task) active = null;
return true;
},
isStale(task = null, chatId = "") {
if (!task || task.stale) return true;
const currentChatId = String(chatId || "").trim();
return Boolean(currentChatId && task.chatId && currentChatId !== task.chatId);
},
};
}

View File

@@ -18,6 +18,9 @@ const DEFAULT_AUTHORITY_SQL_DATABASE = "default";
const AUTHORITY_SQL_QUERY_ENDPOINT = "/sql/query";
const AUTHORITY_SQL_EXEC_ENDPOINT = "/sql/exec";
const AUTHORITY_SQL_TRANSACTION_ENDPOINT = "/sql/transaction";
const AUTHORITY_SQL_TRANSACTION_BATCH_SIZE = 150;
const AUTHORITY_SQL_TRANSACTION_MAX_REQUEST_BYTES = 1024 * 1024;
const AUTHORITY_SQL_TRANSACTION_SAFE_REQUEST_BYTES = 512 * 1024;
const AUTHORITY_TABLES = Object.freeze({
meta: "st_bme_graph_meta",
@@ -85,6 +88,108 @@ function estimatePersistPayloadBytes(value = null) {
}
}
function measureJsonBytes(value = null) {
let json = "";
try {
json = JSON.stringify(value ?? null);
} catch {
json = "";
}
if (typeof TextEncoder === "function") {
return new TextEncoder().encode(json).byteLength;
}
return json.length * 3;
}
function normalizeSqlTransactionByteBudget(value) {
const parsed = Number(value);
if (!Number.isFinite(parsed) || parsed <= 0) return AUTHORITY_SQL_TRANSACTION_SAFE_REQUEST_BYTES;
return Math.max(1024, Math.floor(parsed));
}
function isAuthorityPayloadTooLargeError(error = null) {
const status = Number(error?.status || error?.payload?.status || 0);
const category = String(error?.category || error?.payload?.category || "").toLowerCase();
const code = String(error?.code || error?.payload?.code || error?.payload?.reason || "").toLowerCase();
const message = String(error?.message || error?.payload?.error || error?.payload?.message || "").toLowerCase();
return (
status === 413 ||
category === "payload-too-large" ||
category === "limit" ||
code.includes("length_limit") ||
code.includes("payload") ||
message.includes("length limit") ||
message.includes("payload too large") ||
message.includes("request entity too large")
);
}
function createTerminalAuthoritySqlPayloadError(statement = null, cause = null) {
const estimatedBytes = estimateAuthoritySqlTransactionRequestBytes([statement], DEFAULT_AUTHORITY_SQL_DATABASE);
const error = new Error(
`Authority SQL transaction payload is too large for a single graph persistence statement (${estimatedBytes} bytes estimated); refusing endless retry`,
);
error.name = "AuthoritySqlPayloadTooLargeError";
error.status = Number(cause?.status || 413);
error.code = "authority_sql_payload_too_large";
error.category = "payload-too-large";
error.terminal = true;
error.nonRetryable = true;
error.estimatedBytes = estimatedBytes;
error.cause = cause;
return error;
}
function normalizeAuthorityTransactionStatement(statement = {}) {
const positional = convertNamedParamsToPositional(String(statement?.sql || ""), statement?.params || {});
return {
statement: positional.sql,
params: positional.params,
};
}
function estimateAuthoritySqlTransactionRequestBytes(statements = [], database = DEFAULT_AUTHORITY_SQL_DATABASE) {
return measureJsonBytes({
database: normalizeRecordId(database) || DEFAULT_AUTHORITY_SQL_DATABASE,
statements: toArray(statements)
.filter((statement) => statement?.sql)
.map(normalizeAuthorityTransactionStatement),
});
}
function buildAuthoritySqlTransactionChunks(statements = [], options = {}) {
const normalizedStatements = toArray(statements).filter((statement) => statement?.sql);
const maxStatements = Math.max(1, Math.floor(Number(options.maxStatements) || AUTHORITY_SQL_TRANSACTION_BATCH_SIZE));
const maxBytes = normalizeSqlTransactionByteBudget(options.maxBytes);
const database = normalizeRecordId(options.database) || DEFAULT_AUTHORITY_SQL_DATABASE;
const chunks = [];
let current = [];
const pushCurrent = () => {
if (current.length) {
chunks.push(current);
current = [];
}
};
for (const statement of normalizedStatements) {
if (!current.length) {
current.push(statement);
continue;
}
const candidate = [...current, statement];
const candidateBytes = estimateAuthoritySqlTransactionRequestBytes(candidate, database);
if (candidate.length > maxStatements || candidateBytes > maxBytes) {
pushCurrent();
}
current.push(statement);
}
pushCurrent();
return chunks;
}
function toPlainData(value, fallbackValue = null) {
if (value == null) return fallbackValue;
if (typeof globalThis.structuredClone === "function") {
@@ -390,13 +495,7 @@ export class AuthoritySqlHttpClient {
database: this.database,
statements: toArray(statements)
.filter((statement) => statement?.sql)
.map((statement) => {
const positional = convertNamedParamsToPositional(String(statement.sql || ""), statement.params || {});
return {
statement: positional.sql,
params: positional.params,
};
}),
.map(normalizeAuthorityTransactionStatement),
});
}
@@ -1039,15 +1138,11 @@ export class AuthorityGraphStore {
const normalizedStatements = toArray(statements).filter((statement) => statement?.sql);
if (!normalizedStatements.length) return null;
const BATCH_SIZE = 150;
if (typeof this.sqlClient?.transaction === "function") {
if (normalizedStatements.length <= BATCH_SIZE) {
return await this.sqlClient.transaction(normalizedStatements);
}
let lastResult = null;
for (let i = 0; i < normalizedStatements.length; i += BATCH_SIZE) {
const batch = normalizedStatements.slice(i, i + BATCH_SIZE);
lastResult = await this.sqlClient.transaction(batch);
const chunks = this._buildTransactionChunks(normalizedStatements);
for (const batch of chunks) {
lastResult = await this._executeTransactionChunk(batch);
}
return lastResult;
}
@@ -1078,6 +1173,42 @@ export class AuthorityGraphStore {
return result;
}
_buildTransactionChunks(statements = []) {
const budget = normalizeSqlTransactionByteBudget(
this.options?.sqlTransactionMaxBytes ?? this.options?.authoritySqlTransactionMaxBytes,
);
const maxBytes = Math.min(
AUTHORITY_SQL_TRANSACTION_MAX_REQUEST_BYTES - 64 * 1024,
Math.max(1024, budget),
);
return buildAuthoritySqlTransactionChunks(statements, {
database: this.sqlClient?.database || DEFAULT_AUTHORITY_SQL_DATABASE,
maxStatements: this.options?.sqlTransactionBatchSize ?? AUTHORITY_SQL_TRANSACTION_BATCH_SIZE,
maxBytes,
});
}
async _executeTransactionChunk(batch = []) {
const normalizedBatch = toArray(batch).filter((statement) => statement?.sql);
if (!normalizedBatch.length) return null;
try {
return await this.sqlClient.transaction(normalizedBatch);
} catch (error) {
if (!isAuthorityPayloadTooLargeError(error)) {
throw error;
}
if (normalizedBatch.length <= 1) {
throw createTerminalAuthoritySqlPayloadError(normalizedBatch[0], error);
}
const mid = Math.max(1, Math.floor(normalizedBatch.length / 2));
const left = normalizedBatch.slice(0, mid);
const right = normalizedBatch.slice(mid);
const leftResult = await this._executeTransactionChunk(left);
const rightResult = await this._executeTransactionChunk(right);
return rightResult ?? leftResult;
}
}
_upsertMetaStatement(key, value, nowMs = Date.now()) {
return {
sql: `INSERT INTO ${AUTHORITY_TABLES.meta} (chat_id, meta_key, value_json, updated_at) VALUES (:chatId, :key, :valueJson, :updatedAt) ON CONFLICT(chat_id, meta_key) DO UPDATE SET value_json = excluded.value_json, updated_at = excluded.updated_at`,

View File

@@ -176,6 +176,78 @@ const relativeUnavailable = await probeAuthorityCapabilities({
assert.equal(relativeUnavailable.reason, "relative-url-unavailable");
assert.equal(relativeUnavailable.serverPrimaryReady, false);
const permissionDeniedState = await probeAuthorityCapabilities({
settings: defaultSettings,
allowRelativeUrl: true,
nowMs: 3100,
fetchImpl: async () => ({
ok: false,
status: 403,
async json() {
return { error: "permission denied" };
},
}),
});
assert.equal(permissionDeniedState.reason, "permission-denied");
assert.equal(permissionDeniedState.errorCategory, "permission");
assert.equal(permissionDeniedState.errorDomain, "authority");
const rateLimitedState = await probeAuthorityCapabilities({
settings: defaultSettings,
allowRelativeUrl: true,
nowMs: 3200,
fetchImpl: async () => ({
ok: false,
status: 429,
async json() {
return { error: "slow down" };
},
}),
});
assert.equal(rateLimitedState.reason, "http-error");
assert.equal(rateLimitedState.errorCategory, "rate-limit");
assert.equal(rateLimitedState.errorDomain, "authority");
const serverErrorState = await probeAuthorityCapabilities({
settings: defaultSettings,
allowRelativeUrl: true,
nowMs: 3300,
fetchImpl: async () => ({
ok: false,
status: 503,
async json() {
return { category: "backpressure", code: "job_queue_full" };
},
}),
});
assert.equal(serverErrorState.reason, "http-error");
assert.equal(serverErrorState.errorCategory, "backpressure");
assert.equal(serverErrorState.errorDomain, "authority");
const networkFailedState = await probeAuthorityCapabilities({
settings: defaultSettings,
allowRelativeUrl: true,
nowMs: 3400,
fetchImpl: async () => {
throw new Error("fetch failed");
},
});
assert.equal(networkFailedState.reason, "probe-failed");
assert.equal(networkFailedState.errorCategory, "network");
assert.equal(networkFailedState.errorDomain, "authority");
const timeoutState = await probeAuthorityCapabilities({
settings: defaultSettings,
allowRelativeUrl: true,
nowMs: 3500,
fetchImpl: async () => {
throw Object.assign(new Error("aborted"), { name: "AbortError" });
},
});
assert.equal(timeoutState.reason, "probe-failed");
assert.equal(timeoutState.errorCategory, "timeout");
assert.equal(timeoutState.errorDomain, "authority");
// Regression: Authority capability normalization records explicit supported job types from probe payloads.
// When a probe payload provides jobs.supportedTypes, normalizeAuthorityCapabilityState should surface
// them as supportedJobTypes and set supportedJobTypesKnown = true.

View File

@@ -168,6 +168,18 @@ class MockAuthoritySqlClient {
}
}
function createLimitError() {
const error = new Error("length limit exceeded");
error.status = 413;
error.category = "limit";
error.code = "length_limit_exceeded";
return error;
}
function isNodeUpsertStatement(statement = {}) {
return String(statement.sql || "").toLowerCase().includes("insert into st_bme_graph_nodes");
}
async function testOpenSeedsAuthorityMeta() {
const sqlClient = new MockAuthoritySqlClient();
const store = new AuthorityGraphStore("authority-chat-a", { sqlClient });
@@ -390,10 +402,104 @@ async function testConvertNamedParamsToPositional() {
assert.deepEqual(r8.params, [5]);
}
async function testTransactionBatchingUsesByteBudget() {
const sqlClient = new MockAuthoritySqlClient();
const batchSizes = [];
const originalTransaction = sqlClient.transaction.bind(sqlClient);
sqlClient.transaction = async (statements = []) => {
if (statements.some(isNodeUpsertStatement)) {
batchSizes.push(statements.length);
}
return await originalTransaction(statements);
};
const store = new AuthorityGraphStore("authority-chat-byte-budget", {
sqlClient,
sqlTransactionBatchSize: 150,
sqlTransactionMaxBytes: 4096,
});
const nodes = Array.from({ length: 8 }, (_, index) => ({
id: `node-${index}`,
type: "event",
text: `payload-${index}-${"測試".repeat(260)}`,
updatedAt: index + 1,
}));
await store.bulkUpsertNodes(nodes);
assert.equal((await store.listNodes()).length, nodes.length);
assert.ok(batchSizes.length > 1, "expected large payload to split into multiple transactions");
assert.ok(batchSizes.every((size) => size < nodes.length), "expected no single node transaction batch to contain all records");
}
async function testTransaction413SplitsWithoutRepeatingOversizedBatch() {
const sqlClient = new MockAuthoritySqlClient();
const attemptedBatchSizes = [];
const originalTransaction = sqlClient.transaction.bind(sqlClient);
sqlClient.transaction = async (statements = []) => {
const nodeBatchSize = statements.filter(isNodeUpsertStatement).length;
if (nodeBatchSize > 0) {
attemptedBatchSizes.push(nodeBatchSize);
if (nodeBatchSize > 2) {
throw createLimitError();
}
}
return await originalTransaction(statements);
};
const store = new AuthorityGraphStore("authority-chat-413-split", {
sqlClient,
sqlTransactionBatchSize: 150,
sqlTransactionMaxBytes: 512 * 1024,
});
const nodes = Array.from({ length: 6 }, (_, index) => ({
id: `node-${index}`,
type: "event",
text: `payload-${index}`,
updatedAt: index + 1,
}));
await store.bulkUpsertNodes(nodes);
assert.equal((await store.listNodes()).length, nodes.length);
assert.equal(attemptedBatchSizes[0], 6);
assert.ok(attemptedBatchSizes.some((size) => size <= 2), "expected oversized request to be split below the failing size");
assert.ok(attemptedBatchSizes.length <= 10, "expected bounded split attempts instead of an endless retry loop");
}
async function testSingleStatement413IsTerminal() {
const sqlClient = new MockAuthoritySqlClient();
let oversizedAttempts = 0;
sqlClient.transaction = async (statements = []) => {
if (statements.some(isNodeUpsertStatement)) {
oversizedAttempts += 1;
throw createLimitError();
}
for (const statement of statements) {
await sqlClient.execute(statement.sql, statement.params || {});
}
return { executed: statements.length };
};
const store = new AuthorityGraphStore("authority-chat-single-413", { sqlClient });
await assert.rejects(
() => store.bulkUpsertNodes([{ id: "too-large", type: "event", text: "x".repeat(1024), updatedAt: 1 }]),
(error) => {
assert.equal(error.name, "AuthoritySqlPayloadTooLargeError");
assert.equal(error.nonRetryable, true);
assert.equal(error.terminal, true);
return true;
},
);
assert.ok(oversizedAttempts >= 1, "expected the oversized record to be attempted at least once");
assert.ok(oversizedAttempts <= 4, "single oversized record must not retry forever");
}
await testConvertNamedParamsToPositional();
await testOpenSeedsAuthorityMeta();
await testImportCommitAndExportSnapshot();
await testPruneAndClear();
await testHttpSqlClientBoundary();
await testTransactionBatchingUsesByteBudget();
await testTransaction413SplitsWithoutRepeatingOversizedBatch();
await testSingleStatement413IsTerminal();
console.log(`${PREFIX} all tests passed`);

View File

@@ -1,5 +1,6 @@
import assert from "node:assert/strict";
import { addEdge, addNode, createEdge, createEmptyGraph, createNode } from "../graph/graph.js";
import { AuthorityHttpError } from "../runtime/authority-http-client.js";
import {
installResolveHooks,
toDataModuleUrl,
@@ -33,7 +34,10 @@ const {
normalizeAuthorityVectorConfig,
queryAuthorityTriviumNeighbors,
} = await import("../vector/authority-vector-primary-adapter.js");
const { findSimilarNodesByText: findSimilarNodesByTextFromIndex, syncGraphVectorIndex: syncGraphVectorIndexFromIndex } = await import("../vector/vector-index.js");
const {
findSimilarNodesByText: findSimilarNodesByTextFromIndex,
syncGraphVectorIndex: syncGraphVectorIndexFromIndex,
} = await import("../vector/vector-index.js");
function createAuthorityVectorGraph() {
const graph = createEmptyGraph();
@@ -66,7 +70,7 @@ function createAuthorityVectorGraph() {
return { graph, first, second };
}
function createMockTriviumClient({ failBulkUpsert = false } = {}) {
function createMockTriviumClient({ failBulkUpsert = false, failSearch = false } = {}) {
const calls = [];
return {
calls,
@@ -88,7 +92,11 @@ function createMockTriviumClient({ failBulkUpsert = false } = {}) {
async bulkUpsert(payload) {
calls.push(["bulkUpsert", payload]);
if (failBulkUpsert) {
throw new Error("trivium-down");
throw new AuthorityHttpError("trivium-down", {
status: 503,
category: "server",
path: "/trivium/bulk-upsert",
});
}
return { ok: true, upserted: payload.items?.length || 0 };
},
@@ -102,6 +110,13 @@ function createMockTriviumClient({ failBulkUpsert = false } = {}) {
},
async search(payload) {
calls.push(["search", payload]);
if (failSearch) {
throw new AuthorityHttpError("trivium search denied", {
status: 403,
category: "permission",
path: "/trivium/search",
});
}
return {
results: [
{ nodeId: "node-b", score: 0.91 },
@@ -234,9 +249,77 @@ assert.equal(isAuthorityVectorConfig(config), true);
assert.equal(graph.vectorIndexState.mode, "authority");
assert.equal(graph.vectorIndexState.dirty, true);
assert.equal(graph.vectorIndexState.dirtyReason, "authority-trivium-sync-failed");
assert.equal(result.errorCategory, "server");
assert.equal(result.errorDomain, "authority");
assert.equal(result.timings.errorCategory, "server");
assert.equal(result.timings.authorityErrorCategory, "server");
assert.equal(graph.vectorIndexState.lastErrorCategory, "server");
assert.equal(graph.vectorIndexState.lastErrorDomain, "authority");
assert.equal(result.timings.authorityDiagnostics.upsert.errorCategory, "server");
assert.equal(result.timings.authorityDiagnostics.upsert.chunks[0].errorCategory, "server");
assert.match(graph.vectorIndexState.lastWarning, /Authority Trivium 同步失败/);
}
{
const previousOverrides = globalThis.__stBmeTestOverrides;
globalThis.__stBmeTestOverrides = {
embedding: {
async embedBatch(texts = []) {
return texts.map(() => null);
},
async embedText() {
return null;
},
},
};
try {
const { graph } = createAuthorityVectorGraph();
graph.nodes.forEach((node) => {
node.embedding = null;
});
const triviumClient = createMockTriviumClient();
const result = await syncGraphVectorIndexFromIndex(graph, config, {
chatId: "chat-authority-vector",
purge: true,
triviumClient,
});
assert.match(result.error, /Embedding provider failed/);
assert.doesNotMatch(result.error, /Authority Trivium embedding failed/);
assert.equal(result.errorCategory, "embedding-provider");
assert.equal(result.errorDomain, "embedding");
assert.equal(graph.vectorIndexState.dirtyReason, "embedding-provider-sync-failed");
assert.equal(graph.vectorIndexState.lastErrorCategory, "embedding-provider");
assert.equal(graph.vectorIndexState.lastErrorDomain, "embedding");
assert.match(graph.vectorIndexState.lastWarning, /Embedding provider 同步失败/);
assert.equal(triviumClient.calls.some(([name]) => name === "bulkUpsert"), false);
} finally {
globalThis.__stBmeTestOverrides = previousOverrides;
}
}
{
const { graph, first, second } = createAuthorityVectorGraph();
const triviumClient = createMockTriviumClient({ failSearch: true });
const queryConfig = { ...config, triviumClient };
await syncGraphVectorIndexFromIndex(graph, queryConfig, {
chatId: "chat-authority-vector",
purge: true,
triviumClient,
});
const results = await findSimilarNodesByTextFromIndex(
graph,
"archive door",
queryConfig,
5,
[first, second],
);
assert.deepEqual(results, []);
assert.equal(graph.vectorIndexState.lastSearchTimings.errorCategory, "permission");
assert.equal(graph.vectorIndexState.lastSearchTimings.authorityErrorCategory, "permission");
assert.equal(graph.vectorIndexState.lastErrorCategory, "permission");
assert.equal(graph.vectorIndexState.lastErrorDomain, "authority");
}
{
const triviumClient = createMockTriviumClient();
const queryConfig = { ...config, triviumClient };

80
tests/embedding-batch.mjs Normal file
View File

@@ -0,0 +1,80 @@
import assert from "node:assert/strict";
import { installResolveHooks, toDataModuleUrl } from "./helpers/register-hooks-compat.mjs";
installResolveHooks([
{ specifiers: ["../../../../../script.js"], url: toDataModuleUrl("export function getRequestHeaders() { return {}; }") },
{ specifiers: ["../../../../extensions.js"], url: toDataModuleUrl("export const extension_settings = { st_bme: {} };") },
]);
const { embedBatch } = await import("../vector/embedding.js");
function jsonResponse(payload) {
return new Response(JSON.stringify(payload), { status: 200, headers: { "Content-Type": "application/json" } });
}
async function withFetch(handler, fn) {
const previousFetch = globalThis.fetch;
globalThis.fetch = handler;
try { return await fn(); } finally { globalThis.fetch = previousFetch; }
}
const plain = (vectors) => vectors.map((vector) => (vector ? Array.from(vector) : null));
{
const calls = [];
await withFetch(async (_url, options = {}) => {
const body = JSON.parse(String(options.body || "{}"));
calls.push(body);
return jsonResponse({ data: body.input.map((text, index) => ({ index, embedding: [String(text).length, index] })) });
}, async () => {
const vectors = await embedBatch(["alpha", "beta", "gamma"], { mode: "direct", apiUrl: "https://example.com/v1", apiKey: "sk-test", model: "test-embedding", embeddingBatchSize: 2 });
assert.deepEqual(plain(vectors), [[5, 0], [4, 1], [5, 0]]);
});
assert.deepEqual(calls.map((call) => call.input), [["alpha", "beta"], ["gamma"]]);
}
{
const calls = [];
await withFetch(async (_url, options = {}) => {
const body = JSON.parse(String(options.body || "{}"));
calls.push(body);
if (Array.isArray(body.input)) return new Response("batch schema rejected", { status: 400 });
return jsonResponse({ data: [{ index: 0, embedding: [String(body.input).length, 9] }] });
}, async () => {
const vectors = await embedBatch(["first", "second"], { mode: "direct", apiUrl: "https://example.com/v1/embeddings", model: "test-embedding", embeddingBatchSize: 2 });
assert.deepEqual(plain(vectors), [[5, 9], [6, 9]]);
});
assert.deepEqual(calls.map((call) => call.input), [["first", "second"], "first", "second"]);
}
{
const calls = [];
await withFetch(async (_url, options = {}) => {
const body = JSON.parse(String(options.body || "{}"));
calls.push(body);
if (Array.isArray(body.texts)) return new Response("backend batch rejected", { status: 400 });
return jsonResponse({ vector: [String(body.text).length, 3] });
}, async () => {
const vectors = await embedBatch(["uno", "dos"], { mode: "backend", source: "openai", model: "text-embedding-3-small", embeddingBatchSize: 2 });
assert.deepEqual(plain(vectors), [[3, 3], [3, 3]]);
});
assert.deepEqual(calls.map((call) => [call.texts, call.text]), [[["uno", "dos"], undefined], [undefined, "uno"], [undefined, "dos"]]);
}
{
const calls = [];
await withFetch(async (_url, options = {}) => {
const body = JSON.parse(String(options.body || "{}"));
calls.push(body);
if (Array.isArray(body.input)) {
return jsonResponse({ data: [{ index: 0, embedding: [1, 1] }] });
}
return jsonResponse({ data: [{ index: 0, embedding: [String(body.input).length, 7] }] });
}, async () => {
const vectors = await embedBatch(["kept", "fallback"], { mode: "direct", apiUrl: "https://example.com/v1", model: "test-embedding", embeddingBatchSize: 2 });
assert.deepEqual(plain(vectors), [[1, 1], [8, 7]]);
});
assert.deepEqual(calls.map((call) => call.input), [["kept", "fallback"], "fallback"]);
}
console.log("embedding-batch tests passed");

View File

@@ -482,6 +482,19 @@ async function createGraphPersistenceHarness({
imported: importResult.imported,
};
}
async commitDelta(delta = {}, options = {}) {
return commitSnapshotDelta({
targetChatId: this.chatId,
delta,
options,
getSnapshot: getAuthoritySnapshotForChat,
setSnapshot: setAuthoritySnapshotForChat,
metaPatch: {
storagePrimary: AUTHORITY_GRAPH_STORE_KIND,
storageMode: AUTHORITY_GRAPH_STORE_MODE,
},
});
}
async isEmpty() {
const snapshot = getAuthoritySnapshotForChat(this.chatId);
const nodes = Array.isArray(snapshot?.nodes) ? snapshot.nodes.length : 0;
@@ -517,9 +530,17 @@ async function createGraphPersistenceHarness({
}
}
function commitIndexedDbDelta(targetChatId = "", delta = {}, options = {}) {
function commitSnapshotDelta({
targetChatId = "",
delta = {},
options = {},
getSnapshot,
setSnapshot,
metaPatch = {},
} = {}) {
const normalizedChatId = String(targetChatId || "");
const currentSnapshot = getIndexedDbSnapshotForChat(normalizedChatId);
const currentSnapshot =
typeof getSnapshot === "function" ? getSnapshot(normalizedChatId) : null;
const now = Date.now();
const nodeMap = new Map(
@@ -580,6 +601,7 @@ async function createGraphPersistenceHarness({
meta: {
...(currentSnapshot?.meta || {}),
...runtimeMetaPatch,
...(metaPatch && typeof metaPatch === "object" ? structuredClone(metaPatch) : {}),
chatId: normalizedChatId,
revision: nextRevision,
lastModified: now,
@@ -598,9 +620,9 @@ async function createGraphPersistenceHarness({
state: nextState,
};
setIndexedDbSnapshotForChat(normalizedChatId, nextSnapshot);
runtimeContext.__indexedDbSnapshot =
getIndexedDbSnapshotForChat(normalizedChatId);
if (typeof setSnapshot === "function") {
setSnapshot(normalizedChatId, nextSnapshot);
}
return {
revision: nextRevision,
@@ -620,6 +642,20 @@ async function createGraphPersistenceHarness({
};
}
function commitIndexedDbDelta(targetChatId = "", delta = {}, options = {}) {
const result = commitSnapshotDelta({
targetChatId,
delta,
options,
getSnapshot: getIndexedDbSnapshotForChat,
setSnapshot: setIndexedDbSnapshotForChat,
});
runtimeContext.__indexedDbSnapshot = getIndexedDbSnapshotForChat(
String(targetChatId || ""),
);
return result;
}
const runtimeContext = {
console,
Date,
@@ -1608,6 +1644,7 @@ result = {
retryPendingGraphPersist,
persistExtractionBatchResult,
shouldUseAuthorityJobs,
shouldUseAuthorityGraphStore,
onRebuildLocalCacheFromLukerSidecar,
saveGraphToIndexedDb,
cloneGraphForPersistence,
@@ -4093,6 +4130,81 @@ result = {
);
}
{
const harness = await createGraphPersistenceHarness({
chatId: "chat-authority-sql-storage-only",
globalChatId: "chat-authority-sql-storage-only",
});
harness.runtimeContext.extension_settings[MODULE_NAME] = {
authorityEnabled: "on",
authorityPrimaryWhenAvailable: true,
authorityStorageMode: "server-primary",
authoritySqlPrimary: true,
};
const capability = harness.api.setAuthorityCapabilityState({
installed: true,
healthy: true,
sessionReady: true,
permissionReady: true,
features: ["sql.query", "sql.mutation"],
reason: "missing-required-features",
lastProbeAt: Date.now(),
});
assert.equal(
capability.serverPrimaryReady,
false,
"缺少 jobs/blob/trivium 时整体 Authority server-primary 应保持降级显示",
);
assert.equal(
capability.storagePrimaryReady,
true,
"SQL 存储能力已就绪时图谱主存储应可用",
);
assert.equal(
harness.api.shouldUseAuthorityGraphStore(
harness.runtimeContext.extension_settings[MODULE_NAME],
capability,
),
true,
"Authority SQL 图谱主存储不应被 jobs/blob/trivium 附属能力误伤",
);
assert.equal(
harness.api.shouldUseAuthorityJobs({ mode: "authority", source: "authority-trivium" }),
false,
"jobs 不可用时 Authority job 提交仍应被禁用",
);
harness.api.setCurrentGraph(
stampPersistedGraph(
createMeaningfulGraph("chat-authority-sql-storage-only", "authority-sql-storage-only"),
{
revision: 6,
integrity: "chat-authority-sql-storage-only",
chatId: "chat-authority-sql-storage-only",
reason: "authority-sql-storage-only-seed",
},
),
);
const persistResult = await harness.api.persistExtractionBatchResult({
reason: "authority-sql-storage-only-persist",
lastProcessedAssistantFloor: 6,
});
assert.equal(persistResult.accepted, true);
assert.equal(persistResult.storageTier, "authority-sql");
assert.equal(persistResult.acceptedBy, "authority-sql");
assert.equal(
Number(
harness.api.getAuthoritySnapshotForChat("chat-authority-sql-storage-only")?.meta
?.revision || 0,
),
persistResult.revision,
"SQL-only Authority capability should still perform accepted Authority SQL graph persistence",
);
}
{
const harness = await createGraphPersistenceHarness({
chatId: "chat-b",
@@ -4587,6 +4699,88 @@ result = {
);
}
{
const chatId = "chat-luker-authority-sql-primary";
const persistenceChatId = "meta-luker-authority-sql-primary";
const harness = await createGraphPersistenceHarness({
chatId,
globalChatId: chatId,
characterId: "char-luker-authority-sql",
chatMetadata: {
integrity: persistenceChatId,
},
});
harness.runtimeContext.Luker = {
getContext() {
return harness.runtimeContext.__chatContext;
},
};
harness.runtimeContext.extension_settings[MODULE_NAME] = {
authorityEnabled: "on",
authorityPrimaryWhenAvailable: true,
authorityStorageMode: "server-primary",
authoritySqlPrimary: true,
authorityBrowserCacheMode: "minimal",
};
harness.api.setAuthorityCapabilityState({
installed: true,
healthy: true,
sessionReady: true,
permissionReady: true,
minimumFeatureSetReady: true,
serverPrimaryReady: true,
storagePrimaryReady: true,
triviumPrimaryReady: true,
jobsReady: true,
blobReady: true,
features: [
"sql.query",
"sql.mutation",
"trivium.search",
"jobs",
"blob",
],
supportedJobTypes: ["delay"],
supportedJobTypesKnown: true,
reason: "ok",
lastProbeAt: Date.now(),
});
harness.api.setCurrentGraph(
stampPersistedGraph(
createMeaningfulGraph(chatId, "luker-authority-sql"),
{
revision: 9,
integrity: persistenceChatId,
chatId,
reason: "luker-authority-sql-seed",
},
),
);
const result = await harness.api.persistExtractionBatchResult({
reason: "luker-authority-sql-persist",
lastProcessedAssistantFloor: 9,
});
assert.equal(result.accepted, true);
assert.equal(result.storageTier, "authority-sql");
assert.equal(result.acceptedBy, "authority-sql");
assert.equal(result.primaryTier, "authority-sql");
assert.equal(result.cacheTier, "none");
assert.equal(
await harness.runtimeContext.__chatContext.getChatState(
LUKER_GRAPH_MANIFEST_NAMESPACE,
),
null,
"Authority SQL primary in Luker must not be preempted by Luker sidecar manifest",
);
assert.equal(
Number(harness.api.getAuthoritySnapshotForChat(persistenceChatId)?.meta?.revision || 0),
result.revision,
"Authority SQL snapshot should receive the accepted persist revision",
);
}
{
const chatId = "chat-luker-no-authority-primary";
const harness = await createGraphPersistenceHarness({

View File

@@ -0,0 +1,58 @@
import assert from "node:assert/strict";
import { installResolveHooks, toDataModuleUrl } from "./helpers/register-hooks-compat.mjs";
installResolveHooks([
{ specifiers: ["../../../../../script.js"], url: toDataModuleUrl("export function getRequestHeaders() { return {}; }") },
{ specifiers: ["../../../../extensions.js"], url: toDataModuleUrl("export const extension_settings = { st_bme: {} };") },
]);
const { testVectorConnection } = await import("../vector/vector-index.js");
function jsonResponse(payload) {
return new Response(JSON.stringify(payload), { status: 200, headers: { "Content-Type": "application/json" } });
}
async function withFetch(handler, fn) {
const previousFetch = globalThis.fetch;
globalThis.fetch = handler;
try { return await fn(); } finally { globalThis.fetch = previousFetch; }
}
{
const calls = [];
const result = await withFetch(async (_url, options = {}) => {
const body = JSON.parse(String(options.body || "{}"));
calls.push(body);
assert.equal(Array.isArray(body.input), true);
return jsonResponse({ data: body.input.map((text, index) => ({ index, embedding: [1, index, String(text).length] })) });
}, async () => await testVectorConnection({ mode: "direct", apiUrl: "https://example.com/v1", apiKey: "sk-test", model: "test-embedding" }));
assert.equal(result.success, true);
assert.equal(result.dimensions, 3);
assert.equal(result.batchCapable, true);
assert.equal(result.mode, "direct");
assert.deepEqual(calls[0].input, ["test connection", "runtime batch probe"]);
}
{
const calls = [];
const result = await withFetch(async (url, options = {}) => {
const body = JSON.parse(String(options.body || "{}"));
calls.push({ url: String(url), body });
if (String(url) === "/api/vector/embed") {
assert.equal(Array.isArray(body.texts), true);
return jsonResponse({ vectors: body.texts.map((text, index) => [2, index, String(text).length]) });
}
assert.equal(String(url), "/api/vector/query");
return jsonResponse({ hashes: [] });
}, async () => await testVectorConnection({ mode: "backend", source: "openai", model: "text-embedding-3-small" }));
assert.equal(result.success, true);
assert.equal(result.dimensions, 3);
assert.equal(result.batchCapable, true);
assert.equal(result.vectorStoreCapable, true);
assert.equal(result.mode, "backend");
assert.deepEqual(calls[0].body.texts, ["test connection", "runtime batch probe"]);
assert.equal(calls[1].url, "/api/vector/query");
assert.equal(calls[1].body.searchText, "test connection");
}
console.log("vector-connection-probe tests passed");

View File

@@ -0,0 +1,44 @@
import assert from "node:assert/strict";
import {
createVectorSyncCoalescer,
mergeVectorSyncRange,
normalizeVectorSyncRange,
} from "../runtime/vector-sync-coalescer.js";
assert.deepEqual(normalizeVectorSyncRange({ start: 9, end: 3 }), { start: 3, end: 9 });
assert.equal(normalizeVectorSyncRange({ start: "x", end: 3 }), null);
assert.deepEqual(mergeVectorSyncRange({ start: 2, end: 4 }, { start: 9, end: 6 }), { start: 2, end: 9 });
assert.equal(mergeVectorSyncRange(null, { start: 1, end: 2 }), null);
const coalescer = createVectorSyncCoalescer();
const first = coalescer.enqueue({ id: "first", chatId: "chat-a", modelScope: "direct:model", range: { start: 4, end: 8 }, mode: "balanced", reason: "after-extraction" });
assert.equal(first.scheduled, true);
const second = coalescer.enqueue({ id: "second", chatId: "chat-a", modelScope: "direct:model", range: { start: 1, end: 2 }, mode: "fast", reason: "after-edit" });
assert.equal(second.scheduled, false);
assert.equal(second.coalesced, true);
assert.equal(second.task.id, "first");
assert.deepEqual(second.task.range, { start: 1, end: 8 });
assert.equal(second.task.mode, "fast");
assert.equal(coalescer.start(first.task), true);
const third = coalescer.enqueue({ id: "third", chatId: "chat-a", modelScope: "direct:model", range: { start: 10, end: 12 } });
assert.equal(third.scheduled, true);
assert.equal(third.task.id, "third");
const fourth = coalescer.enqueue({ id: "fourth", chatId: "chat-a", modelScope: "direct:model", range: { start: 20, end: 21 } });
assert.equal(fourth.scheduled, false);
assert.deepEqual(third.task.range, { start: 10, end: 21 });
coalescer.clear("chat-changed");
assert.equal(coalescer.isStale(first.task, "chat-a"), true);
assert.equal(coalescer.isStale(third.task, "chat-a"), true);
const rejected = createVectorSyncCoalescer();
const rejectedFirst = rejected.enqueue({ id: "rejected-first", chatId: "chat-a", modelScope: "direct:model" });
assert.equal(rejected.drop(rejectedFirst.task, "queue-full"), true);
assert.equal(rejected.getPending(), null, "drop returns pending state to empty after queue rejection");
assert.equal(rejected.isStale(rejectedFirst.task, "chat-a"), true);
const rejectedSecond = rejected.enqueue({ id: "rejected-second", chatId: "chat-a", modelScope: "direct:model" });
assert.equal(rejectedSecond.scheduled, true, "new task should schedule after rejected pending is dropped");
assert.equal(rejectedSecond.task.id, "rejected-second");
console.log("vector-sync-coalescer tests passed");

View File

@@ -2,8 +2,9 @@ import { normalizeAuthorityBaseUrl } from "../runtime/authority-capabilities.js"
import {
AUTHORITY_PROTOCOL_SERVER_PLUGIN_V06,
AuthorityHttpClient,
AuthorityHttpError,
} from "../runtime/authority-http-client.js";
import { embedText } from "./embedding.js";
import { embedBatch } from "./embedding.js";
export const AUTHORITY_VECTOR_MODE = "authority";
export const AUTHORITY_VECTOR_SOURCE = "authority-trivium";
@@ -14,6 +15,7 @@ const MAX_AUTHORITY_VECTOR_CHUNK_SIZE = 2000;
const DEFAULT_AUTHORITY_PURGE_PAGE_SIZE = 200;
const DEFAULT_AUTHORITY_PURGE_MAX_PAGES = 1000;
const DEFAULT_AUTHORITY_EMBEDDING_BACKEND_SOURCE = "openai";
const AUTHORITY_CONNECTION_PROBE_TEXTS = ["test connection", "runtime batch probe"];
function clampInteger(value, fallback, min, max) {
const parsed = Number(value);
@@ -89,6 +91,25 @@ function hasPlainKeys(value = null) {
return isPlainObject(value) && Object.keys(value).length > 0;
}
function getAuthorityErrorCategory(error = null) {
return String(error?.category || error?.errorCategory || "").trim();
}
function getAuthorityErrorDomain(error = null) {
if (!error) return "";
return error instanceof AuthorityHttpError || getAuthorityErrorCategory(error) ? "authority" : "";
}
function buildAuthorityErrorDiagnostics(error = null) {
const category = getAuthorityErrorCategory(error);
const domain = getAuthorityErrorDomain(error);
return {
...(category ? { errorCategory: category, authorityErrorCategory: category } : {}),
...(domain ? { errorDomain: domain, authorityErrorDomain: domain } : {}),
...(Number(error?.status || 0) > 0 ? { status: Number(error.status) } : {}),
};
}
function normalizeOpenAICompatibleBaseUrl(value) {
return String(value || "")
.trim()
@@ -816,6 +837,7 @@ export async function upsertAuthorityTriviumEntries(graph, config = {}, entries
durationMs: roundMs(nowMs() - chunkStartedAt),
ok: false,
error: error?.message || String(error),
...buildAuthorityErrorDiagnostics(error),
});
error.authorityDiagnostics = {
operation: "bulkUpsert",
@@ -824,6 +846,7 @@ export async function upsertAuthorityTriviumEntries(graph, config = {}, entries
chunks,
totalBytes,
totalMs: roundMs(nowMs() - startedAt),
...buildAuthorityErrorDiagnostics(error),
};
throw error;
}
@@ -920,9 +943,19 @@ export async function searchAuthorityTriviumNodes(graph, text, config = {}, opti
}
export async function testAuthorityTriviumConnection(config = {}, options = {}) {
const probeVector = await embedText("test connection", config, { isQuery: true });
const probeVectors = await embedBatch(AUTHORITY_CONNECTION_PROBE_TEXTS, config, {
isQuery: true,
});
const probeVector = probeVectors.find((vector) => vector && vector.length > 0);
if (!probeVector || probeVector.length === 0) {
return { success: false, dimensions: 0, error: "Embedding API 返回空结果" };
return {
success: false,
dimensions: 0,
error: "Embedding API 批量返回空结果",
batchCapable: false,
mode: AUTHORITY_VECTOR_MODE,
source: AUTHORITY_VECTOR_SOURCE,
};
}
const client = createAuthorityTriviumClient(config, options);
await callClient(client, ["stat"], "stat", {
@@ -930,5 +963,12 @@ export async function testAuthorityTriviumConnection(config = {}, options = {})
collectionId: options.collectionId,
chatId: options.chatId,
});
return { success: true, dimensions: probeVector.length, error: "" };
return {
success: true,
dimensions: probeVector.length,
error: "",
batchCapable: true,
mode: AUTHORITY_VECTOR_MODE,
source: AUTHORITY_VECTOR_SOURCE,
};
}

View File

@@ -12,6 +12,8 @@ import { resolveConfiguredTimeoutMs } from "../runtime/request-timeout.js";
const MODULE_NAME = "st_bme";
const EMBEDDING_REQUEST_TIMEOUT_MS = 300000;
const DEFAULT_EMBEDDING_BATCH_SIZE = 10;
const MAX_EMBEDDING_BATCH_SIZE = 100;
const BACKEND_SOURCES_REQUIRING_API_URL = new Set([
"ollama",
"llamacpp",
@@ -110,6 +112,94 @@ async function requestBackendEmbeddings(config = {}, payload = {}, { signal } =
return await response.json().catch(() => ({}));
}
function getEmbeddingBatchSize(config = {}) {
const parsed = Number(config?.embeddingBatchSize ?? config?.batchSize);
if (!Number.isFinite(parsed) || parsed <= 0) {
return DEFAULT_EMBEDDING_BATCH_SIZE;
}
return Math.min(MAX_EMBEDDING_BATCH_SIZE, Math.max(1, Math.trunc(parsed)));
}
function chunkTexts(texts = [], size = DEFAULT_EMBEDDING_BATCH_SIZE) {
const chunks = [];
for (let start = 0; start < texts.length; start += size) {
chunks.push({ start, texts: texts.slice(start, start + size) });
}
return chunks;
}
async function requestDirectEmbeddingBatch(texts, config = {}, { signal } = {}) {
const apiUrl = normalizeOpenAICompatibleBaseUrl(config?.apiUrl);
const response = await fetchWithTimeout(
apiUrl + "/embeddings",
{
method: "POST",
headers: {
"Content-Type": "application/json",
...(config.apiKey ? { Authorization: "Bearer " + config.apiKey } : {}),
},
signal,
body: JSON.stringify({
model: config.model,
input: texts,
}),
},
getConfiguredTimeoutMs(config),
);
if (!response.ok) {
const errorText = await response.text().catch(() => response.statusText);
const error = new Error(errorText || response.statusText || "HTTP " + response.status);
error.status = response.status;
throw error;
}
const data = await response.json().catch(() => ({}));
const embeddings = Array.isArray(data?.data) ? data.data : null;
if (!embeddings) {
throw new Error("Embedding API 返回格式异常");
}
const results = new Array(texts.length).fill(null);
embeddings.forEach((item, order) => {
const rawIndex = Number(item?.index);
const index = Number.isInteger(rawIndex) ? rawIndex : order;
if (index >= 0 && index < results.length) {
results[index] = normalizeVector(item?.embedding);
}
});
return results;
}
async function requestBackendEmbeddingBatch(texts, config = {}, { signal, isQuery = false } = {}) {
const payload = await requestBackendEmbeddings(
config,
{ texts, isQuery },
{ signal },
);
const vectors = Array.isArray(payload?.vectors) ? payload.vectors : null;
if (!vectors) {
throw new Error("Backend Embedding API 返回格式异常");
}
return texts.map((_, index) => normalizeVector(vectors[index]));
}
async function fallbackEmbedChunkTexts(texts, config = {}, { signal, isQuery = false } = {}) {
const vectors = [];
for (const text of texts) {
try {
vectors.push(await embedText(text, config, { signal, isQuery }));
} catch (error) {
if (isAbortError(error)) {
throw error;
}
console.error("[ST-BME] Embedding 单条回退失败:", error);
vectors.push(null);
}
}
return vectors;
}
function createCombinedAbortSignal(...signals) {
const validSignals = signals.filter(Boolean);
if (validSignals.length <= 1) {
@@ -264,91 +354,78 @@ export async function embedText(text, config, { signal, isQuery = false } = {})
* @returns {Promise<(Float64Array|null)[]>}
*/
export async function embedBatch(texts, config, { signal, isQuery = false } = {}) {
const normalizedTexts = Array.isArray(texts)
? texts.map((item) => String(item ?? ""))
: [];
const override = getEmbeddingTestOverride("embedBatch");
if (override) {
return await override(texts, config, { signal, isQuery });
return await override(normalizedTexts, config, { signal, isQuery });
}
if (readEmbeddingMode(config) === "backend") {
if (!texts.length || !config?.model) {
return texts.map(() => null);
}
try {
const payload = await requestBackendEmbeddings(
config,
{ texts, isQuery },
{ signal },
);
const vectors = Array.isArray(payload?.vectors) ? payload.vectors : [];
return texts.map((_, index) => normalizeVector(vectors[index]));
} catch (e) {
if (isAbortError(e)) {
throw e;
}
console.error("[ST-BME] Backend Embedding 批量调用失败:", e);
return texts.map(() => null);
}
if (!normalizedTexts.length) {
return [];
}
const isBackend = readEmbeddingMode(config) === "backend";
const apiUrl = normalizeOpenAICompatibleBaseUrl(config?.apiUrl);
if (!texts.length || !apiUrl || !config?.model) {
return texts.map(() => null);
if (!config?.model || (!isBackend && !apiUrl)) {
return normalizedTexts.map(() => null);
}
try {
const response = await fetchWithTimeout(
`${apiUrl}/embeddings`,
{
method: "POST",
headers: {
"Content-Type": "application/json",
...(config.apiKey
? { Authorization: `Bearer ${config.apiKey}` }
: {}),
},
signal,
body: JSON.stringify({
model: config.model,
input: texts,
}),
},
getConfiguredTimeoutMs(config),
);
if (!response.ok) {
const errorText = await response.text();
console.error(
`[ST-BME] Embedding API 批量错误 (${response.status}):`,
errorText,
);
return texts.map(() => null);
}
const data = await response.json();
const embeddings = data?.data;
if (!Array.isArray(embeddings)) {
return texts.map(() => null);
}
// 按 index 排序API 可能不保证顺序)
embeddings.sort((a, b) => a.index - b.index);
return embeddings.map((item) => {
if (item?.embedding && Array.isArray(item.embedding)) {
return new Float64Array(item.embedding);
const results = new Array(normalizedTexts.length).fill(null);
const batchSize = getEmbeddingBatchSize(config);
for (const chunk of chunkTexts(normalizedTexts, batchSize)) {
let vectors = null;
try {
vectors = isBackend
? await requestBackendEmbeddingBatch(chunk.texts, config, { signal, isQuery })
: await requestDirectEmbeddingBatch(chunk.texts, config, { signal });
} catch (error) {
if (isAbortError(error)) {
throw error;
}
return null;
});
} catch (e) {
if (isAbortError(e)) {
throw e;
console.error(
isBackend
? "[ST-BME] Backend Embedding 批量调用失败:"
: "[ST-BME] Embedding API 批量调用失败:",
error,
);
}
console.error("[ST-BME] Embedding API 批量调用失败:", e);
return texts.map(() => null);
}
}
if (!vectors || vectors.length < chunk.texts.length) {
vectors = await fallbackEmbedChunkTexts(chunk.texts, config, {
signal,
isQuery,
});
} else {
const missingIndexes = [];
for (let index = 0; index < chunk.texts.length; index++) {
if (!vectors[index]) {
missingIndexes.push(index);
}
}
if (missingIndexes.length > 0) {
const fallbackVectors = await fallbackEmbedChunkTexts(
missingIndexes.map((index) => chunk.texts[index]),
config,
{
signal,
isQuery,
},
);
missingIndexes.forEach((missingIndex, fallbackIndex) => {
vectors[missingIndex] = fallbackVectors[fallbackIndex] || null;
});
}
}
for (let index = 0; index < chunk.texts.length; index++) {
results[chunk.start + index] = vectors[index] || null;
}
}
return results;
}
/**
* 计算两个向量的 cosine 相似度
*

View File

@@ -64,6 +64,8 @@ function getConfiguredTimeoutMs(config = {}) {
})();
}
const VECTOR_CONNECTION_PROBE_TEXTS = ["test connection", "runtime batch probe"];
const BACKEND_STATUS_MODEL_SOURCES = {
openai: "openai",
cohere: "cohere",
@@ -626,6 +628,7 @@ function markAuthorityVectorStateDirty(
config = {},
reason = "authority-trivium-failed",
warning = "Authority Trivium 索引失败,已标记待重建",
diagnostics = {},
) {
if (!graph?.vectorIndexState || !isAuthorityVectorConfig(config)) {
return;
@@ -655,6 +658,39 @@ function markAuthorityVectorStateDirty(
pending: total > 0 ? Math.max(1, Number(state.lastStats?.pending || 0)) : 0,
};
state.lastWarning = String(warning || "Authority Trivium 索引失败,已标记待重建");
const errorCategory = String(diagnostics.errorCategory || diagnostics.authorityErrorCategory || "").trim();
const errorDomain = String(diagnostics.errorDomain || diagnostics.authorityErrorDomain || "").trim();
if (errorCategory) state.lastErrorCategory = errorCategory;
if (errorDomain) state.lastErrorDomain = errorDomain;
}
function getErrorCategory(error = null) {
return String(error?.category || error?.errorCategory || "").trim();
}
function getErrorDomain(error = null, fallback = "") {
if (!error) return "";
if (error?.errorDomain) return String(error.errorDomain).trim();
if (getErrorCategory(error)) return fallback || "authority";
return fallback;
}
function getAuthorityDiagnosticsErrorPatch(error = null) {
const errorCategory = getErrorCategory(error);
const errorDomain = getErrorDomain(error, errorCategory ? "authority" : "");
return {
...(errorCategory ? { errorCategory, authorityErrorCategory: errorCategory } : {}),
...(errorDomain ? { errorDomain, authorityErrorDomain: errorDomain } : {}),
...(Number(error?.status || 0) > 0 ? { status: Number(error.status) } : {}),
};
}
function createEmbeddingProviderError(failures = 0) {
const count = Math.max(0, Math.floor(Number(failures) || 0));
const error = new Error(`Embedding provider failed for ${count} item(s)`);
error.errorCategory = "embedding-provider";
error.errorDomain = "embedding";
return error;
}
async function ensureEntryEmbeddings(graph, entries = [], config = {}, signal = undefined) {
@@ -802,7 +838,7 @@ export async function syncGraphVectorIndex(
embeddingsRequested += embeddingResult.requested;
embedBatchMs += embeddingResult.elapsedMs;
if (embeddingResult.failures > 0) {
throw new Error(`Authority Trivium embedding failed for ${embeddingResult.failures} item(s)`);
throw createEmbeddingProviderError(embeddingResult.failures);
}
const purgeStartedAt = nowMs();
const purgeResult = await purgeAuthorityTriviumNamespace(config, authorityOptions);
@@ -866,7 +902,7 @@ export async function syncGraphVectorIndex(
embeddingsRequested += embeddingResult.requested;
embedBatchMs += embeddingResult.elapsedMs;
if (embeddingResult.failures > 0) {
throw new Error(`Authority Trivium embedding failed for ${embeddingResult.failures} item(s)`);
throw createEmbeddingProviderError(embeddingResult.failures);
}
deletedNodeCount = nodeIdsToDelete.length;
const deleteStartedAt = nowMs();
@@ -909,17 +945,29 @@ export async function syncGraphVectorIndex(
} catch (error) {
if (isAbortError(error)) throw error;
const message = error?.message || String(error) || "Authority Trivium 同步失败";
const errorCategory = getErrorCategory(error);
const errorDomain = getErrorDomain(error, errorCategory ? "authority" : "");
const dirtyReason = errorDomain === "embedding"
? "embedding-provider-sync-failed"
: "authority-trivium-sync-failed";
const warningPrefix = errorDomain === "embedding"
? "Embedding provider 同步失败"
: "Authority Trivium 同步失败";
markAuthorityVectorStateDirty(
graph,
config,
"authority-trivium-sync-failed",
`Authority Trivium 同步失败${message}),已标记待重建`,
dirtyReason,
`${warningPrefix}${message}),已标记待重建`,
{ errorCategory, errorDomain },
);
state.lastSyncAt = Date.now();
state.lastTimings = {
mode: syncMode,
success: false,
error: message,
...(errorCategory ? { errorCategory } : {}),
...(errorDomain ? { errorDomain } : {}),
...(errorCategory && errorDomain === "authority" ? { authorityErrorCategory: errorCategory, authorityErrorDomain: errorDomain } : {}),
desiredEntries: Number(desiredBuildDiagnostics.entryCount || desiredEntries.length),
desiredBuildMs: roundMs(desiredBuildMs),
authorityPurgeMs: roundMs(authorityPurgeMs),
@@ -940,6 +988,8 @@ export async function syncGraphVectorIndex(
stats: state.lastStats,
timings: state.lastTimings,
error: message,
...(errorCategory ? { errorCategory } : {}),
...(errorDomain ? { errorDomain } : {}),
};
if (config.failOpen === false) {
throw error;
@@ -1291,17 +1341,20 @@ export async function findSimilarNodesByText(
throw error;
}
const message = error?.message || String(error) || "Authority Trivium 查询失败";
const errorPatch = getAuthorityDiagnosticsErrorPatch(error);
markAuthorityVectorStateDirty(
graph,
config,
"authority-trivium-query-failed",
`Authority Trivium 查询失败(${message}),已标记待重建`,
errorPatch,
);
recordSearchTimings({
success: false,
reason: "authority-trivium-query-failed",
requestMs: roundMs(nowMs() - requestStartedAt),
error: message,
...errorPatch,
resultCount: 0,
});
if (config.failOpen === false) {
@@ -1406,15 +1459,69 @@ export async function testVectorConnection(config, chatId = "connection-test") {
return { success: false, dimensions: 0, error: validation.error };
}
if (isDirectVectorConfig(config)) {
if (isDirectVectorConfig(config) || isBackendVectorConfig(config)) {
try {
const vec = await embedText("test connection", config);
if (vec) {
return { success: true, dimensions: vec.length, error: "" };
const vectors = await embedBatch(VECTOR_CONNECTION_PROBE_TEXTS, config, {
isQuery: true,
});
const firstVector = vectors.find((vector) => vector && vector.length > 0);
if (firstVector) {
if (isBackendVectorConfig(config)) {
const response = await fetchWithTimeout(
"/api/vector/query",
{
method: "POST",
headers: getRequestHeaders(),
body: JSON.stringify({
collectionId: buildVectorCollectionId(chatId),
searchText: VECTOR_CONNECTION_PROBE_TEXTS[0],
topK: 1,
threshold: 0,
...buildBackendSourceRequest(config),
}),
},
getConfiguredTimeoutMs(config),
);
const payload = await response.text().catch(() => "");
if (!response.ok) {
return {
success: false,
dimensions: firstVector.length,
error: payload || response.statusText,
batchCapable: true,
vectorStoreCapable: false,
mode: config.mode,
source: config.source || "backend",
};
}
}
return {
success: true,
dimensions: firstVector.length,
error: "",
batchCapable: true,
vectorStoreCapable: isBackendVectorConfig(config) ? true : undefined,
mode: config.mode,
source: config.source || "direct",
};
}
return { success: false, dimensions: 0, error: "API 返回空结果" };
return {
success: false,
dimensions: 0,
error: "批量 Embedding API 返回空结果",
batchCapable: false,
mode: config.mode,
source: config.source || "direct",
};
} catch (error) {
return { success: false, dimensions: 0, error: String(error) };
return {
success: false,
dimensions: 0,
error: String(error),
batchCapable: false,
mode: config.mode,
source: config.source || "direct",
};
}
}
@@ -1429,38 +1536,8 @@ export async function testVectorConnection(config, chatId = "connection-test") {
}
}
try {
const response = await fetchWithTimeout(
"/api/vector/query",
{
method: "POST",
headers: getRequestHeaders(),
body: JSON.stringify({
collectionId: buildVectorCollectionId(chatId),
searchText: "test connection",
topK: 1,
threshold: 0,
...buildBackendSourceRequest(config),
}),
},
getConfiguredTimeoutMs(config),
);
const payload = await response.text().catch(() => "");
if (!response.ok) {
return {
success: false,
dimensions: 0,
error: payload || response.statusText,
};
}
return { success: true, dimensions: 0, error: "" };
} catch (error) {
return { success: false, dimensions: 0, error: String(error) };
}
return { success: false, dimensions: 0, error: "未知向量配置" };
}
export function getVectorIndexStats(graph) {
const state = graph?.vectorIndexState;
if (!state) {