diff --git a/host/event-binding.js b/host/event-binding.js index 64c0dad..71dc555 100644 --- a/host/event-binding.js +++ b/host/event-binding.js @@ -239,6 +239,7 @@ export function onChatChangedController(runtime) { runtime.setPendingHistoryRecoveryTimer(null); runtime.setPendingHistoryRecoveryTrigger(""); runtime.clearPendingAutoExtraction?.(); + runtime.clearPendingBackgroundVectorSync?.(); runtime.clearPendingGraphLoadRetry(); runtime.setSkipBeforeCombineRecallUntil(0); runtime.setLastPreGenerationRecallKey(""); diff --git a/index.js b/index.js index 9186bf0..9139997 100644 --- a/index.js +++ b/index.js @@ -257,6 +257,7 @@ import { writePersistedRecallToUserMessage, } from "./retrieval/recall-persistence.js"; import { resolveConfiguredTimeoutMs } from "./runtime/request-timeout.js"; +import { createVectorSyncCoalescer as createImportedVectorSyncCoalescer } from "./runtime/vector-sync-coalescer.js"; import { defaultSettings, getPersistedSettingsSnapshot, @@ -1308,6 +1309,45 @@ const backgroundMaintenanceQueue = typeof createBackgroundMaintenanceQueue === "function" ? createBackgroundMaintenanceQueue() : null; +const backgroundVectorSyncCoalescer = + typeof createImportedVectorSyncCoalescer === "function" + ? createImportedVectorSyncCoalescer() + : { + clear() {}, + getActive() { + return null; + }, + getPending() { + return null; + }, + enqueue(task = {}) { + return { + scheduled: true, + coalesced: false, + task: { + ...(task || {}), + stale: false, + }, + }; + }, + start(task = null) { + return Boolean(task && !task.stale); + }, + complete() { + return true; + }, + drop(task = null) { + if (task) task.stale = true; + return Boolean(task); + }, + isStale(task = null, chatId = "") { + return Boolean( + !task || + task.stale || + (chatId && task.chatId && String(chatId) !== String(task.chatId)), + ); + }, + }; const lastStatusToastAt = {}; let pendingRecallSendIntent = createRecallInputRecord(); let lastRecallSentUserMessage = createRecallInputRecord(); @@ -6967,7 +7007,6 @@ function shouldUseAuthorityGraphStore(settings = getSettings(), capability = aut normalizedSettings.sqlPrimary && normalizedSettings.storageMode !== "local-primary" && normalizedSettings.storageMode !== "off" && - normalizedCapability.serverPrimaryReady && normalizedCapability.storagePrimaryReady ); } @@ -14394,7 +14433,7 @@ function buildBatchPersistenceRecordFromPersistResult(persistResult = null) { if ( accepted && - ["indexeddb", "opfs", "luker-chat-state"].includes( + ["indexeddb", "opfs", "authority-sql", "luker-chat-state"].includes( String(persistResult?.storageTier || ""), ) ) { @@ -14449,6 +14488,7 @@ async function persistGraphToConfiguredDurableTier( if ( persistenceEnvironment.hostProfile === "luker" && + persistenceEnvironment.primaryStorageTier === "luker-chat-state" && canUseHostGraphChatStatePersistence(context) ) { const chatStateResult = await persistGraphToHostChatState(context, { @@ -16777,50 +16817,81 @@ async function syncVectorState({ function scheduleBackgroundVectorSync(task = null, settings = {}) { const normalizedTask = task && typeof task === "object" && !Array.isArray(task) ? task : {}; - const range = - normalizedTask.range && - Number.isFinite(Number(normalizedTask.range.start)) && - Number.isFinite(Number(normalizedTask.range.end)) - ? { - start: Math.floor(Number(normalizedTask.range.start)), - end: Math.floor(Number(normalizedTask.range.end)), - } - : null; - const reason = - String(normalizedTask.reason || "background-vector-sync").trim() || - "background-vector-sync"; + const config = getEmbeddingConfig(); + const chatId = normalizeChatIdCandidate( + normalizedTask.chatId || getCurrentChatId() || graphPersistenceState.chatId, + ); const mode = String( normalizedTask.mode || resolveMaintenancePostProcessConcurrency(settings).mode || "balanced", ).trim() || "balanced"; - return enqueueBackgroundMaintenanceTask( + const coalesced = backgroundVectorSyncCoalescer.enqueue({ + ...normalizedTask, + chatId, + modelScope: getVectorModelScope(config), + mode, + reason: + String(normalizedTask.reason || "background-vector-sync").trim() || + "background-vector-sync", + }); + const scheduledTask = coalesced.task; + + if (!coalesced.scheduled) { + return { + queued: true, + coalesced: true, + id: scheduledTask.id, + snapshot: updateBackgroundMaintenanceQueueState( + typeof backgroundMaintenanceQueue?.getSnapshot === "function" + ? backgroundMaintenanceQueue.getSnapshot() + : null, + ), + }; + } + + const queuedResult = enqueueBackgroundMaintenanceTask( "vector-sync", async () => { - setLastVectorStatus( - "后台向量同步中", - `${mode} 模式 · 正在同步提取后的向量索引`, - "running", - { syncRuntime: false }, - ); - const result = await syncVectorState({ range }); - if (result?.aborted) { - throw createAbortError(result.error || "后台向量同步已终止"); + backgroundVectorSyncCoalescer.start(scheduledTask); + try { + const activeChatId = normalizeChatIdCandidate(getCurrentChatId()); + if (backgroundVectorSyncCoalescer.isStale(scheduledTask, activeChatId)) { + return { skipped: true, reason: "stale-background-vector-sync" }; + } + setLastVectorStatus( + "后台向量同步中", + `${scheduledTask.mode} 模式 · 正在同步提取后的向量索引`, + "running", + { syncRuntime: false }, + ); + const result = await syncVectorState({ range: scheduledTask.range }); + if (result?.aborted) { + throw createAbortError(result.error || "后台向量同步已终止"); + } + if (result?.error) { + throw new Error(result.error); + } + saveGraphToChat({ reason: scheduledTask.reason }); + return result; + } finally { + backgroundVectorSyncCoalescer.complete(scheduledTask); } - if (result?.error) { - throw new Error(result.error); - } - saveGraphToChat({ reason }); - return result; }, settings, { - id: String(normalizedTask.id || ""), + id: scheduledTask.id, }, ); + if (queuedResult?.queued !== true) { + backgroundVectorSyncCoalescer.drop?.( + scheduledTask, + queuedResult?.reason || "background-vector-sync-queue-rejected", + ); + } + return queuedResult; } - function hasPlanCommitChanges(planCommit = null) { if (!planCommit || typeof planCommit !== "object") return false; return [ @@ -22349,6 +22420,7 @@ function onChatChanged() { clearGenerationRecallTransactionsForChat, clearInjectionState, clearPendingAutoExtraction, + clearPendingBackgroundVectorSync: () => backgroundVectorSyncCoalescer.clear("chat-changed"), clearPendingGraphLoadRetry, clearPendingHistoryMutationChecks, clearCurrentGenerationTrivialSkip, diff --git a/manifest.json b/manifest.json index a5993d2..ba5f5f2 100644 --- a/manifest.json +++ b/manifest.json @@ -6,6 +6,6 @@ "js": "index.js", "css": "style.css", "author": "Youzini", - "version": "6.3.0", + "version": "6.3.3", "homePage": "https://github.com/Youzini-afk/ST-Bionic-Memory-Ecology" } diff --git a/runtime/authority-capabilities.js b/runtime/authority-capabilities.js index e9cc30c..4c1bee0 100644 --- a/runtime/authority-capabilities.js +++ b/runtime/authority-capabilities.js @@ -162,6 +162,26 @@ function readPayloadMessage(payload = {}, fallback = "") { return String(payload.error || payload.message || payload.reason || fallback || ""); } +function classifyAuthorityProbeStatus(status = 0, payload = null) { + const payloadCategory = String(payload?.category || "").trim(); + if (payloadCategory) return payloadCategory; + const numericStatus = Number(status || 0); + if (numericStatus === 408) return "timeout"; + if (numericStatus === 401 || numericStatus === 403) return "permission"; + if (numericStatus === 413) return "payload-too-large"; + if (numericStatus === 429) return "rate-limit"; + if (numericStatus >= 500) return "server"; + if (numericStatus >= 400) return "validation"; + return ""; +} + +function classifyAuthorityProbeError(error = null) { + const category = String(error?.category || error?.errorCategory || "").trim(); + if (category) return category; + if (String(error?.name || "") === "AbortError") return "timeout"; + return error ? "network" : ""; +} + function buildAuthorityPermissionEvaluateRequests(settings = {}, readiness = {}, options = {}) { const requests = []; const sqlTarget = String(options.sqlTarget || settings.sqlTarget || "default"); @@ -198,6 +218,8 @@ async function verifyAuthorityDataPlane(baseUrl, fetchImpl, headers, settings = reason: initStatus === 401 || initStatus === 403 ? "session-init-denied" : "session-init-failed", lastError: readPayloadMessage(initPayload, `HTTP ${initStatus || "unknown"}`), status: initStatus, + errorCategory: classifyAuthorityProbeStatus(initStatus, initPayload), + errorDomain: "authority", }; } @@ -231,6 +253,8 @@ async function verifyAuthorityDataPlane(baseUrl, fetchImpl, headers, settings = reason: currentStatus === 401 || currentStatus === 403 ? "session-invalid" : "session-current-failed", lastError: readPayloadMessage(currentPayload, `HTTP ${currentStatus || "unknown"}`), status: currentStatus, + errorCategory: classifyAuthorityProbeStatus(currentStatus, currentPayload), + errorDomain: "authority", }; } @@ -259,6 +283,8 @@ async function verifyAuthorityDataPlane(baseUrl, fetchImpl, headers, settings = reason: permissionStatus === 401 || permissionStatus === 403 ? "permission-denied" : "permission-evaluate-failed", lastError: readPayloadMessage(permissionPayload, `HTTP ${permissionStatus || "unknown"}`), status: permissionStatus, + errorCategory: classifyAuthorityProbeStatus(permissionStatus, permissionPayload), + errorDomain: "authority", }; } @@ -408,6 +434,8 @@ export function createDefaultAuthorityCapabilityState(overrides = {}) { missingFeatures: ["sql.query", "sql.mutation", "trivium.search", "jobs", "blob-or-private-files"], reason: "not-probed", lastError: "", + errorCategory: "", + errorDomain: "", endpoint: "", status: 0, latencyMs: 0, @@ -459,6 +487,8 @@ export function normalizeAuthorityCapabilityState(input = {}, settings = {}) { missingFeatures, reason: String(source.reason || (healthy ? "ok" : "not-ready")), lastError: String(source.lastError || ""), + errorCategory: String(source.errorCategory || ""), + errorDomain: String(source.errorDomain || ""), endpoint: String(source.endpoint || ""), status: clampInteger(source.status, 0, 0, 999), latencyMs: Math.max(0, Number(source.latencyMs) || 0), @@ -547,6 +577,7 @@ export async function probeAuthorityCapabilities(options = {}) { let lastError = ""; let lastStatus = 0; + let lastErrorCategory = ""; for (const endpoint of buildAuthorityProbeUrls(settings.baseUrl)) { const startedAt = readNowMs(); try { @@ -555,6 +586,7 @@ export async function probeAuthorityCapabilities(options = {}) { const status = Number(response?.status || 0); lastStatus = status; if (status === 404) continue; + const errorPayload = response?.ok ? null : await readResponsePayload(response); if (status === 401 || status === 403) { return normalizeAuthorityCapabilityState( { @@ -563,7 +595,9 @@ export async function probeAuthorityCapabilities(options = {}) { sessionReady: false, permissionReady: false, reason: "permission-denied", - lastError: `HTTP ${status}`, + lastError: readPayloadMessage(errorPayload, `HTTP ${status}`), + errorCategory: classifyAuthorityProbeStatus(status, errorPayload), + errorDomain: "authority", endpoint, status, latencyMs: normalizeLatencyMs(startedAt, finishedAt), @@ -579,7 +613,9 @@ export async function probeAuthorityCapabilities(options = {}) { installed: status > 0, healthy: false, reason: "http-error", - lastError: `HTTP ${status || "unknown"}`, + lastError: readPayloadMessage(errorPayload, `HTTP ${status || "unknown"}`), + errorCategory: classifyAuthorityProbeStatus(status, errorPayload), + errorDomain: "authority", endpoint, status, latencyMs: normalizeLatencyMs(startedAt, finishedAt), @@ -605,12 +641,16 @@ export async function probeAuthorityCapabilities(options = {}) { let reason = missingFeatures.length ? "missing-required-features" : "ok"; let dataPlaneLastError = ""; let dataPlaneStatus = status; + let dataPlaneErrorCategory = ""; + let dataPlaneErrorDomain = ""; if (healthy) { const verified = await verifyAuthorityDataPlane(settings.baseUrl, fetchImpl, headers, settings, readiness, options); sessionReady = verified.sessionReady; permissionReady = verified.permissionReady; dataPlaneStatus = Number(verified.status || status || 0); dataPlaneLastError = String(verified.lastError || ""); + dataPlaneErrorCategory = String(verified.errorCategory || ""); + dataPlaneErrorDomain = String(verified.errorDomain || ""); if (verified.reason && verified.reason !== "ok") { reason = verified.reason; } @@ -627,6 +667,8 @@ export async function probeAuthorityCapabilities(options = {}) { missingFeatures, reason, lastError: dataPlaneLastError, + errorCategory: dataPlaneErrorCategory, + errorDomain: dataPlaneErrorDomain, endpoint, status: dataPlaneStatus, latencyMs: normalizeLatencyMs(startedAt, finishedAt), @@ -637,6 +679,8 @@ export async function probeAuthorityCapabilities(options = {}) { ); } catch (error) { lastError = error?.message || String(error); + lastStatus = Number(error?.status || lastStatus || 0); + lastErrorCategory = classifyAuthorityProbeError(error); } } @@ -646,6 +690,8 @@ export async function probeAuthorityCapabilities(options = {}) { healthy: false, reason: lastStatus === 404 ? "not-installed" : "probe-failed", lastError, + errorCategory: lastErrorCategory || classifyAuthorityProbeStatus(lastStatus), + errorDomain: lastErrorCategory || lastStatus ? "authority" : "", status: lastStatus, lastProbeAt: nowMs, updatedAt: new Date(nowMs).toISOString(), diff --git a/runtime/vector-sync-coalescer.js b/runtime/vector-sync-coalescer.js new file mode 100644 index 0000000..9dd5203 --- /dev/null +++ b/runtime/vector-sync-coalescer.js @@ -0,0 +1,133 @@ +export function normalizeVectorSyncRange(range = null) { + if ( + range && + Number.isFinite(Number(range.start)) && + Number.isFinite(Number(range.end)) + ) { + const start = Math.floor(Number(range.start)); + const end = Math.floor(Number(range.end)); + return { + start: Math.min(start, end), + end: Math.max(start, end), + }; + } + return null; +} + +export function mergeVectorSyncRange(current = null, next = null) { + const currentRange = normalizeVectorSyncRange(current); + const nextRange = normalizeVectorSyncRange(next); + if (!currentRange || !nextRange) return null; + return { + start: Math.min(currentRange.start, nextRange.start), + end: Math.max(currentRange.end, nextRange.end), + }; +} + +function createTaskRecord(task = {}) { + const id = String(task.id || `vector-sync:${Date.now()}`); + return { + id, + chatId: String(task.chatId || "").trim(), + modelScope: String(task.modelScope || "").trim(), + range: normalizeVectorSyncRange(task.range), + reason: + String(task.reason || "background-vector-sync").trim() || + "background-vector-sync", + mode: String(task.mode || "balanced").trim() || "balanced", + stale: false, + requestedAt: Date.now(), + updatedAt: Date.now(), + }; +} + +function canMergeTask(left = null, right = null) { + return Boolean( + left && + right && + !left.stale && + left.chatId === right.chatId && + left.modelScope === right.modelScope, + ); +} + +function mergeTaskInto(target, incoming) { + target.range = mergeVectorSyncRange(target.range, incoming.range); + target.reason = + target.reason === incoming.reason + ? target.reason + : `${target.reason}+${incoming.reason}`; + target.mode = incoming.mode || target.mode; + target.updatedAt = Date.now(); + return target; +} + +function markStale(task = null, reason = "stale") { + if (!task) return; + task.stale = true; + task.clearReason = String(reason || "stale"); +} + +export function createVectorSyncCoalescer() { + let active = null; + let pending = null; + + return { + clear(reason = "clear") { + markStale(active, reason); + markStale(pending, reason); + active = null; + pending = null; + }, + getActive() { + return active; + }, + getPending() { + return pending; + }, + enqueue(task = {}) { + const incoming = createTaskRecord(task); + if (canMergeTask(active, incoming)) { + if (canMergeTask(pending, incoming)) { + mergeTaskInto(pending, incoming); + return { scheduled: false, coalesced: true, task: pending }; + } + markStale(pending, "replaced"); + pending = incoming; + return { scheduled: true, coalesced: false, task: pending }; + } + if (canMergeTask(pending, incoming)) { + mergeTaskInto(pending, incoming); + return { scheduled: false, coalesced: true, task: pending }; + } + markStale(pending, "replaced"); + pending = incoming; + return { scheduled: true, coalesced: false, task: pending }; + }, + start(task = null) { + if (!task || task.stale) return false; + if (pending === task) pending = null; + active = task; + return true; + }, + complete(task = null) { + if (task && active !== task) return false; + active = null; + return true; + }, + drop(task = null, reason = "dropped") { + if (!task) return false; + const target = pending === task ? pending : active === task ? active : null; + if (!target) return false; + markStale(target, reason); + if (pending === task) pending = null; + if (active === task) active = null; + return true; + }, + isStale(task = null, chatId = "") { + if (!task || task.stale) return true; + const currentChatId = String(chatId || "").trim(); + return Boolean(currentChatId && task.chatId && currentChatId !== task.chatId); + }, + }; +} diff --git a/sync/authority-graph-store.js b/sync/authority-graph-store.js index 1eb0216..b5c99df 100644 --- a/sync/authority-graph-store.js +++ b/sync/authority-graph-store.js @@ -18,6 +18,9 @@ const DEFAULT_AUTHORITY_SQL_DATABASE = "default"; const AUTHORITY_SQL_QUERY_ENDPOINT = "/sql/query"; const AUTHORITY_SQL_EXEC_ENDPOINT = "/sql/exec"; const AUTHORITY_SQL_TRANSACTION_ENDPOINT = "/sql/transaction"; +const AUTHORITY_SQL_TRANSACTION_BATCH_SIZE = 150; +const AUTHORITY_SQL_TRANSACTION_MAX_REQUEST_BYTES = 1024 * 1024; +const AUTHORITY_SQL_TRANSACTION_SAFE_REQUEST_BYTES = 512 * 1024; const AUTHORITY_TABLES = Object.freeze({ meta: "st_bme_graph_meta", @@ -85,6 +88,108 @@ function estimatePersistPayloadBytes(value = null) { } } +function measureJsonBytes(value = null) { + let json = ""; + try { + json = JSON.stringify(value ?? null); + } catch { + json = ""; + } + if (typeof TextEncoder === "function") { + return new TextEncoder().encode(json).byteLength; + } + return json.length * 3; +} + +function normalizeSqlTransactionByteBudget(value) { + const parsed = Number(value); + if (!Number.isFinite(parsed) || parsed <= 0) return AUTHORITY_SQL_TRANSACTION_SAFE_REQUEST_BYTES; + return Math.max(1024, Math.floor(parsed)); +} + +function isAuthorityPayloadTooLargeError(error = null) { + const status = Number(error?.status || error?.payload?.status || 0); + const category = String(error?.category || error?.payload?.category || "").toLowerCase(); + const code = String(error?.code || error?.payload?.code || error?.payload?.reason || "").toLowerCase(); + const message = String(error?.message || error?.payload?.error || error?.payload?.message || "").toLowerCase(); + return ( + status === 413 || + category === "payload-too-large" || + category === "limit" || + code.includes("length_limit") || + code.includes("payload") || + message.includes("length limit") || + message.includes("payload too large") || + message.includes("request entity too large") + ); +} + +function createTerminalAuthoritySqlPayloadError(statement = null, cause = null) { + const estimatedBytes = estimateAuthoritySqlTransactionRequestBytes([statement], DEFAULT_AUTHORITY_SQL_DATABASE); + const error = new Error( + `Authority SQL transaction payload is too large for a single graph persistence statement (${estimatedBytes} bytes estimated); refusing endless retry`, + ); + error.name = "AuthoritySqlPayloadTooLargeError"; + error.status = Number(cause?.status || 413); + error.code = "authority_sql_payload_too_large"; + error.category = "payload-too-large"; + error.terminal = true; + error.nonRetryable = true; + error.estimatedBytes = estimatedBytes; + error.cause = cause; + return error; +} + +function normalizeAuthorityTransactionStatement(statement = {}) { + const positional = convertNamedParamsToPositional(String(statement?.sql || ""), statement?.params || {}); + return { + statement: positional.sql, + params: positional.params, + }; +} + +function estimateAuthoritySqlTransactionRequestBytes(statements = [], database = DEFAULT_AUTHORITY_SQL_DATABASE) { + return measureJsonBytes({ + database: normalizeRecordId(database) || DEFAULT_AUTHORITY_SQL_DATABASE, + statements: toArray(statements) + .filter((statement) => statement?.sql) + .map(normalizeAuthorityTransactionStatement), + }); +} + +function buildAuthoritySqlTransactionChunks(statements = [], options = {}) { + const normalizedStatements = toArray(statements).filter((statement) => statement?.sql); + const maxStatements = Math.max(1, Math.floor(Number(options.maxStatements) || AUTHORITY_SQL_TRANSACTION_BATCH_SIZE)); + const maxBytes = normalizeSqlTransactionByteBudget(options.maxBytes); + const database = normalizeRecordId(options.database) || DEFAULT_AUTHORITY_SQL_DATABASE; + const chunks = []; + let current = []; + + const pushCurrent = () => { + if (current.length) { + chunks.push(current); + current = []; + } + }; + + for (const statement of normalizedStatements) { + if (!current.length) { + current.push(statement); + continue; + } + + const candidate = [...current, statement]; + const candidateBytes = estimateAuthoritySqlTransactionRequestBytes(candidate, database); + if (candidate.length > maxStatements || candidateBytes > maxBytes) { + pushCurrent(); + } + current.push(statement); + } + + pushCurrent(); + return chunks; +} + function toPlainData(value, fallbackValue = null) { if (value == null) return fallbackValue; if (typeof globalThis.structuredClone === "function") { @@ -390,13 +495,7 @@ export class AuthoritySqlHttpClient { database: this.database, statements: toArray(statements) .filter((statement) => statement?.sql) - .map((statement) => { - const positional = convertNamedParamsToPositional(String(statement.sql || ""), statement.params || {}); - return { - statement: positional.sql, - params: positional.params, - }; - }), + .map(normalizeAuthorityTransactionStatement), }); } @@ -1039,15 +1138,11 @@ export class AuthorityGraphStore { const normalizedStatements = toArray(statements).filter((statement) => statement?.sql); if (!normalizedStatements.length) return null; - const BATCH_SIZE = 150; if (typeof this.sqlClient?.transaction === "function") { - if (normalizedStatements.length <= BATCH_SIZE) { - return await this.sqlClient.transaction(normalizedStatements); - } let lastResult = null; - for (let i = 0; i < normalizedStatements.length; i += BATCH_SIZE) { - const batch = normalizedStatements.slice(i, i + BATCH_SIZE); - lastResult = await this.sqlClient.transaction(batch); + const chunks = this._buildTransactionChunks(normalizedStatements); + for (const batch of chunks) { + lastResult = await this._executeTransactionChunk(batch); } return lastResult; } @@ -1078,6 +1173,42 @@ export class AuthorityGraphStore { return result; } + _buildTransactionChunks(statements = []) { + const budget = normalizeSqlTransactionByteBudget( + this.options?.sqlTransactionMaxBytes ?? this.options?.authoritySqlTransactionMaxBytes, + ); + const maxBytes = Math.min( + AUTHORITY_SQL_TRANSACTION_MAX_REQUEST_BYTES - 64 * 1024, + Math.max(1024, budget), + ); + return buildAuthoritySqlTransactionChunks(statements, { + database: this.sqlClient?.database || DEFAULT_AUTHORITY_SQL_DATABASE, + maxStatements: this.options?.sqlTransactionBatchSize ?? AUTHORITY_SQL_TRANSACTION_BATCH_SIZE, + maxBytes, + }); + } + + async _executeTransactionChunk(batch = []) { + const normalizedBatch = toArray(batch).filter((statement) => statement?.sql); + if (!normalizedBatch.length) return null; + try { + return await this.sqlClient.transaction(normalizedBatch); + } catch (error) { + if (!isAuthorityPayloadTooLargeError(error)) { + throw error; + } + if (normalizedBatch.length <= 1) { + throw createTerminalAuthoritySqlPayloadError(normalizedBatch[0], error); + } + const mid = Math.max(1, Math.floor(normalizedBatch.length / 2)); + const left = normalizedBatch.slice(0, mid); + const right = normalizedBatch.slice(mid); + const leftResult = await this._executeTransactionChunk(left); + const rightResult = await this._executeTransactionChunk(right); + return rightResult ?? leftResult; + } + } + _upsertMetaStatement(key, value, nowMs = Date.now()) { return { sql: `INSERT INTO ${AUTHORITY_TABLES.meta} (chat_id, meta_key, value_json, updated_at) VALUES (:chatId, :key, :valueJson, :updatedAt) ON CONFLICT(chat_id, meta_key) DO UPDATE SET value_json = excluded.value_json, updated_at = excluded.updated_at`, diff --git a/tests/authority-capabilities.mjs b/tests/authority-capabilities.mjs index 376872e..1d399fe 100644 --- a/tests/authority-capabilities.mjs +++ b/tests/authority-capabilities.mjs @@ -176,6 +176,78 @@ const relativeUnavailable = await probeAuthorityCapabilities({ assert.equal(relativeUnavailable.reason, "relative-url-unavailable"); assert.equal(relativeUnavailable.serverPrimaryReady, false); +const permissionDeniedState = await probeAuthorityCapabilities({ + settings: defaultSettings, + allowRelativeUrl: true, + nowMs: 3100, + fetchImpl: async () => ({ + ok: false, + status: 403, + async json() { + return { error: "permission denied" }; + }, + }), +}); +assert.equal(permissionDeniedState.reason, "permission-denied"); +assert.equal(permissionDeniedState.errorCategory, "permission"); +assert.equal(permissionDeniedState.errorDomain, "authority"); + +const rateLimitedState = await probeAuthorityCapabilities({ + settings: defaultSettings, + allowRelativeUrl: true, + nowMs: 3200, + fetchImpl: async () => ({ + ok: false, + status: 429, + async json() { + return { error: "slow down" }; + }, + }), +}); +assert.equal(rateLimitedState.reason, "http-error"); +assert.equal(rateLimitedState.errorCategory, "rate-limit"); +assert.equal(rateLimitedState.errorDomain, "authority"); + +const serverErrorState = await probeAuthorityCapabilities({ + settings: defaultSettings, + allowRelativeUrl: true, + nowMs: 3300, + fetchImpl: async () => ({ + ok: false, + status: 503, + async json() { + return { category: "backpressure", code: "job_queue_full" }; + }, + }), +}); +assert.equal(serverErrorState.reason, "http-error"); +assert.equal(serverErrorState.errorCategory, "backpressure"); +assert.equal(serverErrorState.errorDomain, "authority"); + +const networkFailedState = await probeAuthorityCapabilities({ + settings: defaultSettings, + allowRelativeUrl: true, + nowMs: 3400, + fetchImpl: async () => { + throw new Error("fetch failed"); + }, +}); +assert.equal(networkFailedState.reason, "probe-failed"); +assert.equal(networkFailedState.errorCategory, "network"); +assert.equal(networkFailedState.errorDomain, "authority"); + +const timeoutState = await probeAuthorityCapabilities({ + settings: defaultSettings, + allowRelativeUrl: true, + nowMs: 3500, + fetchImpl: async () => { + throw Object.assign(new Error("aborted"), { name: "AbortError" }); + }, +}); +assert.equal(timeoutState.reason, "probe-failed"); +assert.equal(timeoutState.errorCategory, "timeout"); +assert.equal(timeoutState.errorDomain, "authority"); + // Regression: Authority capability normalization records explicit supported job types from probe payloads. // When a probe payload provides jobs.supportedTypes, normalizeAuthorityCapabilityState should surface // them as supportedJobTypes and set supportedJobTypesKnown = true. diff --git a/tests/authority-graph-store.mjs b/tests/authority-graph-store.mjs index fdad2af..63278cc 100644 --- a/tests/authority-graph-store.mjs +++ b/tests/authority-graph-store.mjs @@ -168,6 +168,18 @@ class MockAuthoritySqlClient { } } +function createLimitError() { + const error = new Error("length limit exceeded"); + error.status = 413; + error.category = "limit"; + error.code = "length_limit_exceeded"; + return error; +} + +function isNodeUpsertStatement(statement = {}) { + return String(statement.sql || "").toLowerCase().includes("insert into st_bme_graph_nodes"); +} + async function testOpenSeedsAuthorityMeta() { const sqlClient = new MockAuthoritySqlClient(); const store = new AuthorityGraphStore("authority-chat-a", { sqlClient }); @@ -390,10 +402,104 @@ async function testConvertNamedParamsToPositional() { assert.deepEqual(r8.params, [5]); } +async function testTransactionBatchingUsesByteBudget() { + const sqlClient = new MockAuthoritySqlClient(); + const batchSizes = []; + const originalTransaction = sqlClient.transaction.bind(sqlClient); + sqlClient.transaction = async (statements = []) => { + if (statements.some(isNodeUpsertStatement)) { + batchSizes.push(statements.length); + } + return await originalTransaction(statements); + }; + const store = new AuthorityGraphStore("authority-chat-byte-budget", { + sqlClient, + sqlTransactionBatchSize: 150, + sqlTransactionMaxBytes: 4096, + }); + const nodes = Array.from({ length: 8 }, (_, index) => ({ + id: `node-${index}`, + type: "event", + text: `payload-${index}-${"測試".repeat(260)}`, + updatedAt: index + 1, + })); + + await store.bulkUpsertNodes(nodes); + + assert.equal((await store.listNodes()).length, nodes.length); + assert.ok(batchSizes.length > 1, "expected large payload to split into multiple transactions"); + assert.ok(batchSizes.every((size) => size < nodes.length), "expected no single node transaction batch to contain all records"); +} + +async function testTransaction413SplitsWithoutRepeatingOversizedBatch() { + const sqlClient = new MockAuthoritySqlClient(); + const attemptedBatchSizes = []; + const originalTransaction = sqlClient.transaction.bind(sqlClient); + sqlClient.transaction = async (statements = []) => { + const nodeBatchSize = statements.filter(isNodeUpsertStatement).length; + if (nodeBatchSize > 0) { + attemptedBatchSizes.push(nodeBatchSize); + if (nodeBatchSize > 2) { + throw createLimitError(); + } + } + return await originalTransaction(statements); + }; + const store = new AuthorityGraphStore("authority-chat-413-split", { + sqlClient, + sqlTransactionBatchSize: 150, + sqlTransactionMaxBytes: 512 * 1024, + }); + const nodes = Array.from({ length: 6 }, (_, index) => ({ + id: `node-${index}`, + type: "event", + text: `payload-${index}`, + updatedAt: index + 1, + })); + + await store.bulkUpsertNodes(nodes); + + assert.equal((await store.listNodes()).length, nodes.length); + assert.equal(attemptedBatchSizes[0], 6); + assert.ok(attemptedBatchSizes.some((size) => size <= 2), "expected oversized request to be split below the failing size"); + assert.ok(attemptedBatchSizes.length <= 10, "expected bounded split attempts instead of an endless retry loop"); +} + +async function testSingleStatement413IsTerminal() { + const sqlClient = new MockAuthoritySqlClient(); + let oversizedAttempts = 0; + sqlClient.transaction = async (statements = []) => { + if (statements.some(isNodeUpsertStatement)) { + oversizedAttempts += 1; + throw createLimitError(); + } + for (const statement of statements) { + await sqlClient.execute(statement.sql, statement.params || {}); + } + return { executed: statements.length }; + }; + const store = new AuthorityGraphStore("authority-chat-single-413", { sqlClient }); + + await assert.rejects( + () => store.bulkUpsertNodes([{ id: "too-large", type: "event", text: "x".repeat(1024), updatedAt: 1 }]), + (error) => { + assert.equal(error.name, "AuthoritySqlPayloadTooLargeError"); + assert.equal(error.nonRetryable, true); + assert.equal(error.terminal, true); + return true; + }, + ); + assert.ok(oversizedAttempts >= 1, "expected the oversized record to be attempted at least once"); + assert.ok(oversizedAttempts <= 4, "single oversized record must not retry forever"); +} + await testConvertNamedParamsToPositional(); await testOpenSeedsAuthorityMeta(); await testImportCommitAndExportSnapshot(); await testPruneAndClear(); await testHttpSqlClientBoundary(); +await testTransactionBatchingUsesByteBudget(); +await testTransaction413SplitsWithoutRepeatingOversizedBatch(); +await testSingleStatement413IsTerminal(); console.log(`${PREFIX} all tests passed`); diff --git a/tests/authority-vector-primary.mjs b/tests/authority-vector-primary.mjs index 9cfd235..ae56de9 100644 --- a/tests/authority-vector-primary.mjs +++ b/tests/authority-vector-primary.mjs @@ -1,5 +1,6 @@ import assert from "node:assert/strict"; import { addEdge, addNode, createEdge, createEmptyGraph, createNode } from "../graph/graph.js"; +import { AuthorityHttpError } from "../runtime/authority-http-client.js"; import { installResolveHooks, toDataModuleUrl, @@ -33,7 +34,10 @@ const { normalizeAuthorityVectorConfig, queryAuthorityTriviumNeighbors, } = await import("../vector/authority-vector-primary-adapter.js"); -const { findSimilarNodesByText: findSimilarNodesByTextFromIndex, syncGraphVectorIndex: syncGraphVectorIndexFromIndex } = await import("../vector/vector-index.js"); +const { + findSimilarNodesByText: findSimilarNodesByTextFromIndex, + syncGraphVectorIndex: syncGraphVectorIndexFromIndex, +} = await import("../vector/vector-index.js"); function createAuthorityVectorGraph() { const graph = createEmptyGraph(); @@ -66,7 +70,7 @@ function createAuthorityVectorGraph() { return { graph, first, second }; } -function createMockTriviumClient({ failBulkUpsert = false } = {}) { +function createMockTriviumClient({ failBulkUpsert = false, failSearch = false } = {}) { const calls = []; return { calls, @@ -88,7 +92,11 @@ function createMockTriviumClient({ failBulkUpsert = false } = {}) { async bulkUpsert(payload) { calls.push(["bulkUpsert", payload]); if (failBulkUpsert) { - throw new Error("trivium-down"); + throw new AuthorityHttpError("trivium-down", { + status: 503, + category: "server", + path: "/trivium/bulk-upsert", + }); } return { ok: true, upserted: payload.items?.length || 0 }; }, @@ -102,6 +110,13 @@ function createMockTriviumClient({ failBulkUpsert = false } = {}) { }, async search(payload) { calls.push(["search", payload]); + if (failSearch) { + throw new AuthorityHttpError("trivium search denied", { + status: 403, + category: "permission", + path: "/trivium/search", + }); + } return { results: [ { nodeId: "node-b", score: 0.91 }, @@ -234,9 +249,77 @@ assert.equal(isAuthorityVectorConfig(config), true); assert.equal(graph.vectorIndexState.mode, "authority"); assert.equal(graph.vectorIndexState.dirty, true); assert.equal(graph.vectorIndexState.dirtyReason, "authority-trivium-sync-failed"); + assert.equal(result.errorCategory, "server"); + assert.equal(result.errorDomain, "authority"); + assert.equal(result.timings.errorCategory, "server"); + assert.equal(result.timings.authorityErrorCategory, "server"); + assert.equal(graph.vectorIndexState.lastErrorCategory, "server"); + assert.equal(graph.vectorIndexState.lastErrorDomain, "authority"); + assert.equal(result.timings.authorityDiagnostics.upsert.errorCategory, "server"); + assert.equal(result.timings.authorityDiagnostics.upsert.chunks[0].errorCategory, "server"); assert.match(graph.vectorIndexState.lastWarning, /Authority Trivium 同步失败/); } +{ + const previousOverrides = globalThis.__stBmeTestOverrides; + globalThis.__stBmeTestOverrides = { + embedding: { + async embedBatch(texts = []) { + return texts.map(() => null); + }, + async embedText() { + return null; + }, + }, + }; + try { + const { graph } = createAuthorityVectorGraph(); + graph.nodes.forEach((node) => { + node.embedding = null; + }); + const triviumClient = createMockTriviumClient(); + const result = await syncGraphVectorIndexFromIndex(graph, config, { + chatId: "chat-authority-vector", + purge: true, + triviumClient, + }); + assert.match(result.error, /Embedding provider failed/); + assert.doesNotMatch(result.error, /Authority Trivium embedding failed/); + assert.equal(result.errorCategory, "embedding-provider"); + assert.equal(result.errorDomain, "embedding"); + assert.equal(graph.vectorIndexState.dirtyReason, "embedding-provider-sync-failed"); + assert.equal(graph.vectorIndexState.lastErrorCategory, "embedding-provider"); + assert.equal(graph.vectorIndexState.lastErrorDomain, "embedding"); + assert.match(graph.vectorIndexState.lastWarning, /Embedding provider 同步失败/); + assert.equal(triviumClient.calls.some(([name]) => name === "bulkUpsert"), false); + } finally { + globalThis.__stBmeTestOverrides = previousOverrides; + } +} + +{ + const { graph, first, second } = createAuthorityVectorGraph(); + const triviumClient = createMockTriviumClient({ failSearch: true }); + const queryConfig = { ...config, triviumClient }; + await syncGraphVectorIndexFromIndex(graph, queryConfig, { + chatId: "chat-authority-vector", + purge: true, + triviumClient, + }); + const results = await findSimilarNodesByTextFromIndex( + graph, + "archive door", + queryConfig, + 5, + [first, second], + ); + assert.deepEqual(results, []); + assert.equal(graph.vectorIndexState.lastSearchTimings.errorCategory, "permission"); + assert.equal(graph.vectorIndexState.lastSearchTimings.authorityErrorCategory, "permission"); + assert.equal(graph.vectorIndexState.lastErrorCategory, "permission"); + assert.equal(graph.vectorIndexState.lastErrorDomain, "authority"); +} + { const triviumClient = createMockTriviumClient(); const queryConfig = { ...config, triviumClient }; diff --git a/tests/embedding-batch.mjs b/tests/embedding-batch.mjs new file mode 100644 index 0000000..cf62782 --- /dev/null +++ b/tests/embedding-batch.mjs @@ -0,0 +1,80 @@ +import assert from "node:assert/strict"; +import { installResolveHooks, toDataModuleUrl } from "./helpers/register-hooks-compat.mjs"; + +installResolveHooks([ + { specifiers: ["../../../../../script.js"], url: toDataModuleUrl("export function getRequestHeaders() { return {}; }") }, + { specifiers: ["../../../../extensions.js"], url: toDataModuleUrl("export const extension_settings = { st_bme: {} };") }, +]); + +const { embedBatch } = await import("../vector/embedding.js"); + +function jsonResponse(payload) { + return new Response(JSON.stringify(payload), { status: 200, headers: { "Content-Type": "application/json" } }); +} + +async function withFetch(handler, fn) { + const previousFetch = globalThis.fetch; + globalThis.fetch = handler; + try { return await fn(); } finally { globalThis.fetch = previousFetch; } +} + +const plain = (vectors) => vectors.map((vector) => (vector ? Array.from(vector) : null)); + +{ + const calls = []; + await withFetch(async (_url, options = {}) => { + const body = JSON.parse(String(options.body || "{}")); + calls.push(body); + return jsonResponse({ data: body.input.map((text, index) => ({ index, embedding: [String(text).length, index] })) }); + }, async () => { + const vectors = await embedBatch(["alpha", "beta", "gamma"], { mode: "direct", apiUrl: "https://example.com/v1", apiKey: "sk-test", model: "test-embedding", embeddingBatchSize: 2 }); + assert.deepEqual(plain(vectors), [[5, 0], [4, 1], [5, 0]]); + }); + assert.deepEqual(calls.map((call) => call.input), [["alpha", "beta"], ["gamma"]]); +} + +{ + const calls = []; + await withFetch(async (_url, options = {}) => { + const body = JSON.parse(String(options.body || "{}")); + calls.push(body); + if (Array.isArray(body.input)) return new Response("batch schema rejected", { status: 400 }); + return jsonResponse({ data: [{ index: 0, embedding: [String(body.input).length, 9] }] }); + }, async () => { + const vectors = await embedBatch(["first", "second"], { mode: "direct", apiUrl: "https://example.com/v1/embeddings", model: "test-embedding", embeddingBatchSize: 2 }); + assert.deepEqual(plain(vectors), [[5, 9], [6, 9]]); + }); + assert.deepEqual(calls.map((call) => call.input), [["first", "second"], "first", "second"]); +} + +{ + const calls = []; + await withFetch(async (_url, options = {}) => { + const body = JSON.parse(String(options.body || "{}")); + calls.push(body); + if (Array.isArray(body.texts)) return new Response("backend batch rejected", { status: 400 }); + return jsonResponse({ vector: [String(body.text).length, 3] }); + }, async () => { + const vectors = await embedBatch(["uno", "dos"], { mode: "backend", source: "openai", model: "text-embedding-3-small", embeddingBatchSize: 2 }); + assert.deepEqual(plain(vectors), [[3, 3], [3, 3]]); + }); + assert.deepEqual(calls.map((call) => [call.texts, call.text]), [[["uno", "dos"], undefined], [undefined, "uno"], [undefined, "dos"]]); +} + +{ + const calls = []; + await withFetch(async (_url, options = {}) => { + const body = JSON.parse(String(options.body || "{}")); + calls.push(body); + if (Array.isArray(body.input)) { + return jsonResponse({ data: [{ index: 0, embedding: [1, 1] }] }); + } + return jsonResponse({ data: [{ index: 0, embedding: [String(body.input).length, 7] }] }); + }, async () => { + const vectors = await embedBatch(["kept", "fallback"], { mode: "direct", apiUrl: "https://example.com/v1", model: "test-embedding", embeddingBatchSize: 2 }); + assert.deepEqual(plain(vectors), [[1, 1], [8, 7]]); + }); + assert.deepEqual(calls.map((call) => call.input), [["kept", "fallback"], "fallback"]); +} + +console.log("embedding-batch tests passed"); diff --git a/tests/graph-persistence.mjs b/tests/graph-persistence.mjs index d703514..d5383d2 100644 --- a/tests/graph-persistence.mjs +++ b/tests/graph-persistence.mjs @@ -482,6 +482,19 @@ async function createGraphPersistenceHarness({ imported: importResult.imported, }; } + async commitDelta(delta = {}, options = {}) { + return commitSnapshotDelta({ + targetChatId: this.chatId, + delta, + options, + getSnapshot: getAuthoritySnapshotForChat, + setSnapshot: setAuthoritySnapshotForChat, + metaPatch: { + storagePrimary: AUTHORITY_GRAPH_STORE_KIND, + storageMode: AUTHORITY_GRAPH_STORE_MODE, + }, + }); + } async isEmpty() { const snapshot = getAuthoritySnapshotForChat(this.chatId); const nodes = Array.isArray(snapshot?.nodes) ? snapshot.nodes.length : 0; @@ -517,9 +530,17 @@ async function createGraphPersistenceHarness({ } } - function commitIndexedDbDelta(targetChatId = "", delta = {}, options = {}) { + function commitSnapshotDelta({ + targetChatId = "", + delta = {}, + options = {}, + getSnapshot, + setSnapshot, + metaPatch = {}, + } = {}) { const normalizedChatId = String(targetChatId || ""); - const currentSnapshot = getIndexedDbSnapshotForChat(normalizedChatId); + const currentSnapshot = + typeof getSnapshot === "function" ? getSnapshot(normalizedChatId) : null; const now = Date.now(); const nodeMap = new Map( @@ -580,6 +601,7 @@ async function createGraphPersistenceHarness({ meta: { ...(currentSnapshot?.meta || {}), ...runtimeMetaPatch, + ...(metaPatch && typeof metaPatch === "object" ? structuredClone(metaPatch) : {}), chatId: normalizedChatId, revision: nextRevision, lastModified: now, @@ -598,9 +620,9 @@ async function createGraphPersistenceHarness({ state: nextState, }; - setIndexedDbSnapshotForChat(normalizedChatId, nextSnapshot); - runtimeContext.__indexedDbSnapshot = - getIndexedDbSnapshotForChat(normalizedChatId); + if (typeof setSnapshot === "function") { + setSnapshot(normalizedChatId, nextSnapshot); + } return { revision: nextRevision, @@ -620,6 +642,20 @@ async function createGraphPersistenceHarness({ }; } + function commitIndexedDbDelta(targetChatId = "", delta = {}, options = {}) { + const result = commitSnapshotDelta({ + targetChatId, + delta, + options, + getSnapshot: getIndexedDbSnapshotForChat, + setSnapshot: setIndexedDbSnapshotForChat, + }); + runtimeContext.__indexedDbSnapshot = getIndexedDbSnapshotForChat( + String(targetChatId || ""), + ); + return result; + } + const runtimeContext = { console, Date, @@ -1608,6 +1644,7 @@ result = { retryPendingGraphPersist, persistExtractionBatchResult, shouldUseAuthorityJobs, + shouldUseAuthorityGraphStore, onRebuildLocalCacheFromLukerSidecar, saveGraphToIndexedDb, cloneGraphForPersistence, @@ -4093,6 +4130,81 @@ result = { ); } +{ + const harness = await createGraphPersistenceHarness({ + chatId: "chat-authority-sql-storage-only", + globalChatId: "chat-authority-sql-storage-only", + }); + harness.runtimeContext.extension_settings[MODULE_NAME] = { + authorityEnabled: "on", + authorityPrimaryWhenAvailable: true, + authorityStorageMode: "server-primary", + authoritySqlPrimary: true, + }; + const capability = harness.api.setAuthorityCapabilityState({ + installed: true, + healthy: true, + sessionReady: true, + permissionReady: true, + features: ["sql.query", "sql.mutation"], + reason: "missing-required-features", + lastProbeAt: Date.now(), + }); + + assert.equal( + capability.serverPrimaryReady, + false, + "缺少 jobs/blob/trivium 时整体 Authority server-primary 应保持降级显示", + ); + assert.equal( + capability.storagePrimaryReady, + true, + "SQL 存储能力已就绪时图谱主存储应可用", + ); + assert.equal( + harness.api.shouldUseAuthorityGraphStore( + harness.runtimeContext.extension_settings[MODULE_NAME], + capability, + ), + true, + "Authority SQL 图谱主存储不应被 jobs/blob/trivium 附属能力误伤", + ); + assert.equal( + harness.api.shouldUseAuthorityJobs({ mode: "authority", source: "authority-trivium" }), + false, + "jobs 不可用时 Authority job 提交仍应被禁用", + ); + + harness.api.setCurrentGraph( + stampPersistedGraph( + createMeaningfulGraph("chat-authority-sql-storage-only", "authority-sql-storage-only"), + { + revision: 6, + integrity: "chat-authority-sql-storage-only", + chatId: "chat-authority-sql-storage-only", + reason: "authority-sql-storage-only-seed", + }, + ), + ); + + const persistResult = await harness.api.persistExtractionBatchResult({ + reason: "authority-sql-storage-only-persist", + lastProcessedAssistantFloor: 6, + }); + + assert.equal(persistResult.accepted, true); + assert.equal(persistResult.storageTier, "authority-sql"); + assert.equal(persistResult.acceptedBy, "authority-sql"); + assert.equal( + Number( + harness.api.getAuthoritySnapshotForChat("chat-authority-sql-storage-only")?.meta + ?.revision || 0, + ), + persistResult.revision, + "SQL-only Authority capability should still perform accepted Authority SQL graph persistence", + ); +} + { const harness = await createGraphPersistenceHarness({ chatId: "chat-b", @@ -4587,6 +4699,88 @@ result = { ); } +{ + const chatId = "chat-luker-authority-sql-primary"; + const persistenceChatId = "meta-luker-authority-sql-primary"; + const harness = await createGraphPersistenceHarness({ + chatId, + globalChatId: chatId, + characterId: "char-luker-authority-sql", + chatMetadata: { + integrity: persistenceChatId, + }, + }); + harness.runtimeContext.Luker = { + getContext() { + return harness.runtimeContext.__chatContext; + }, + }; + harness.runtimeContext.extension_settings[MODULE_NAME] = { + authorityEnabled: "on", + authorityPrimaryWhenAvailable: true, + authorityStorageMode: "server-primary", + authoritySqlPrimary: true, + authorityBrowserCacheMode: "minimal", + }; + harness.api.setAuthorityCapabilityState({ + installed: true, + healthy: true, + sessionReady: true, + permissionReady: true, + minimumFeatureSetReady: true, + serverPrimaryReady: true, + storagePrimaryReady: true, + triviumPrimaryReady: true, + jobsReady: true, + blobReady: true, + features: [ + "sql.query", + "sql.mutation", + "trivium.search", + "jobs", + "blob", + ], + supportedJobTypes: ["delay"], + supportedJobTypesKnown: true, + reason: "ok", + lastProbeAt: Date.now(), + }); + harness.api.setCurrentGraph( + stampPersistedGraph( + createMeaningfulGraph(chatId, "luker-authority-sql"), + { + revision: 9, + integrity: persistenceChatId, + chatId, + reason: "luker-authority-sql-seed", + }, + ), + ); + + const result = await harness.api.persistExtractionBatchResult({ + reason: "luker-authority-sql-persist", + lastProcessedAssistantFloor: 9, + }); + + assert.equal(result.accepted, true); + assert.equal(result.storageTier, "authority-sql"); + assert.equal(result.acceptedBy, "authority-sql"); + assert.equal(result.primaryTier, "authority-sql"); + assert.equal(result.cacheTier, "none"); + assert.equal( + await harness.runtimeContext.__chatContext.getChatState( + LUKER_GRAPH_MANIFEST_NAMESPACE, + ), + null, + "Authority SQL primary in Luker must not be preempted by Luker sidecar manifest", + ); + assert.equal( + Number(harness.api.getAuthoritySnapshotForChat(persistenceChatId)?.meta?.revision || 0), + result.revision, + "Authority SQL snapshot should receive the accepted persist revision", + ); +} + { const chatId = "chat-luker-no-authority-primary"; const harness = await createGraphPersistenceHarness({ diff --git a/tests/vector-connection-probe.mjs b/tests/vector-connection-probe.mjs new file mode 100644 index 0000000..c3ea3e6 --- /dev/null +++ b/tests/vector-connection-probe.mjs @@ -0,0 +1,58 @@ +import assert from "node:assert/strict"; +import { installResolveHooks, toDataModuleUrl } from "./helpers/register-hooks-compat.mjs"; + +installResolveHooks([ + { specifiers: ["../../../../../script.js"], url: toDataModuleUrl("export function getRequestHeaders() { return {}; }") }, + { specifiers: ["../../../../extensions.js"], url: toDataModuleUrl("export const extension_settings = { st_bme: {} };") }, +]); + +const { testVectorConnection } = await import("../vector/vector-index.js"); + +function jsonResponse(payload) { + return new Response(JSON.stringify(payload), { status: 200, headers: { "Content-Type": "application/json" } }); +} + +async function withFetch(handler, fn) { + const previousFetch = globalThis.fetch; + globalThis.fetch = handler; + try { return await fn(); } finally { globalThis.fetch = previousFetch; } +} + +{ + const calls = []; + const result = await withFetch(async (_url, options = {}) => { + const body = JSON.parse(String(options.body || "{}")); + calls.push(body); + assert.equal(Array.isArray(body.input), true); + return jsonResponse({ data: body.input.map((text, index) => ({ index, embedding: [1, index, String(text).length] })) }); + }, async () => await testVectorConnection({ mode: "direct", apiUrl: "https://example.com/v1", apiKey: "sk-test", model: "test-embedding" })); + assert.equal(result.success, true); + assert.equal(result.dimensions, 3); + assert.equal(result.batchCapable, true); + assert.equal(result.mode, "direct"); + assert.deepEqual(calls[0].input, ["test connection", "runtime batch probe"]); +} + +{ + const calls = []; + const result = await withFetch(async (url, options = {}) => { + const body = JSON.parse(String(options.body || "{}")); + calls.push({ url: String(url), body }); + if (String(url) === "/api/vector/embed") { + assert.equal(Array.isArray(body.texts), true); + return jsonResponse({ vectors: body.texts.map((text, index) => [2, index, String(text).length]) }); + } + assert.equal(String(url), "/api/vector/query"); + return jsonResponse({ hashes: [] }); + }, async () => await testVectorConnection({ mode: "backend", source: "openai", model: "text-embedding-3-small" })); + assert.equal(result.success, true); + assert.equal(result.dimensions, 3); + assert.equal(result.batchCapable, true); + assert.equal(result.vectorStoreCapable, true); + assert.equal(result.mode, "backend"); + assert.deepEqual(calls[0].body.texts, ["test connection", "runtime batch probe"]); + assert.equal(calls[1].url, "/api/vector/query"); + assert.equal(calls[1].body.searchText, "test connection"); +} + +console.log("vector-connection-probe tests passed"); diff --git a/tests/vector-sync-coalescer.mjs b/tests/vector-sync-coalescer.mjs new file mode 100644 index 0000000..93362da --- /dev/null +++ b/tests/vector-sync-coalescer.mjs @@ -0,0 +1,44 @@ +import assert from "node:assert/strict"; +import { + createVectorSyncCoalescer, + mergeVectorSyncRange, + normalizeVectorSyncRange, +} from "../runtime/vector-sync-coalescer.js"; + +assert.deepEqual(normalizeVectorSyncRange({ start: 9, end: 3 }), { start: 3, end: 9 }); +assert.equal(normalizeVectorSyncRange({ start: "x", end: 3 }), null); +assert.deepEqual(mergeVectorSyncRange({ start: 2, end: 4 }, { start: 9, end: 6 }), { start: 2, end: 9 }); +assert.equal(mergeVectorSyncRange(null, { start: 1, end: 2 }), null); + +const coalescer = createVectorSyncCoalescer(); +const first = coalescer.enqueue({ id: "first", chatId: "chat-a", modelScope: "direct:model", range: { start: 4, end: 8 }, mode: "balanced", reason: "after-extraction" }); +assert.equal(first.scheduled, true); +const second = coalescer.enqueue({ id: "second", chatId: "chat-a", modelScope: "direct:model", range: { start: 1, end: 2 }, mode: "fast", reason: "after-edit" }); +assert.equal(second.scheduled, false); +assert.equal(second.coalesced, true); +assert.equal(second.task.id, "first"); +assert.deepEqual(second.task.range, { start: 1, end: 8 }); +assert.equal(second.task.mode, "fast"); + +assert.equal(coalescer.start(first.task), true); +const third = coalescer.enqueue({ id: "third", chatId: "chat-a", modelScope: "direct:model", range: { start: 10, end: 12 } }); +assert.equal(third.scheduled, true); +assert.equal(third.task.id, "third"); +const fourth = coalescer.enqueue({ id: "fourth", chatId: "chat-a", modelScope: "direct:model", range: { start: 20, end: 21 } }); +assert.equal(fourth.scheduled, false); +assert.deepEqual(third.task.range, { start: 10, end: 21 }); + +coalescer.clear("chat-changed"); +assert.equal(coalescer.isStale(first.task, "chat-a"), true); +assert.equal(coalescer.isStale(third.task, "chat-a"), true); + +const rejected = createVectorSyncCoalescer(); +const rejectedFirst = rejected.enqueue({ id: "rejected-first", chatId: "chat-a", modelScope: "direct:model" }); +assert.equal(rejected.drop(rejectedFirst.task, "queue-full"), true); +assert.equal(rejected.getPending(), null, "drop returns pending state to empty after queue rejection"); +assert.equal(rejected.isStale(rejectedFirst.task, "chat-a"), true); +const rejectedSecond = rejected.enqueue({ id: "rejected-second", chatId: "chat-a", modelScope: "direct:model" }); +assert.equal(rejectedSecond.scheduled, true, "new task should schedule after rejected pending is dropped"); +assert.equal(rejectedSecond.task.id, "rejected-second"); + +console.log("vector-sync-coalescer tests passed"); diff --git a/vector/authority-vector-primary-adapter.js b/vector/authority-vector-primary-adapter.js index 5ebc518..ade3bde 100644 --- a/vector/authority-vector-primary-adapter.js +++ b/vector/authority-vector-primary-adapter.js @@ -2,8 +2,9 @@ import { normalizeAuthorityBaseUrl } from "../runtime/authority-capabilities.js" import { AUTHORITY_PROTOCOL_SERVER_PLUGIN_V06, AuthorityHttpClient, + AuthorityHttpError, } from "../runtime/authority-http-client.js"; -import { embedText } from "./embedding.js"; +import { embedBatch } from "./embedding.js"; export const AUTHORITY_VECTOR_MODE = "authority"; export const AUTHORITY_VECTOR_SOURCE = "authority-trivium"; @@ -14,6 +15,7 @@ const MAX_AUTHORITY_VECTOR_CHUNK_SIZE = 2000; const DEFAULT_AUTHORITY_PURGE_PAGE_SIZE = 200; const DEFAULT_AUTHORITY_PURGE_MAX_PAGES = 1000; const DEFAULT_AUTHORITY_EMBEDDING_BACKEND_SOURCE = "openai"; +const AUTHORITY_CONNECTION_PROBE_TEXTS = ["test connection", "runtime batch probe"]; function clampInteger(value, fallback, min, max) { const parsed = Number(value); @@ -89,6 +91,25 @@ function hasPlainKeys(value = null) { return isPlainObject(value) && Object.keys(value).length > 0; } +function getAuthorityErrorCategory(error = null) { + return String(error?.category || error?.errorCategory || "").trim(); +} + +function getAuthorityErrorDomain(error = null) { + if (!error) return ""; + return error instanceof AuthorityHttpError || getAuthorityErrorCategory(error) ? "authority" : ""; +} + +function buildAuthorityErrorDiagnostics(error = null) { + const category = getAuthorityErrorCategory(error); + const domain = getAuthorityErrorDomain(error); + return { + ...(category ? { errorCategory: category, authorityErrorCategory: category } : {}), + ...(domain ? { errorDomain: domain, authorityErrorDomain: domain } : {}), + ...(Number(error?.status || 0) > 0 ? { status: Number(error.status) } : {}), + }; +} + function normalizeOpenAICompatibleBaseUrl(value) { return String(value || "") .trim() @@ -816,6 +837,7 @@ export async function upsertAuthorityTriviumEntries(graph, config = {}, entries durationMs: roundMs(nowMs() - chunkStartedAt), ok: false, error: error?.message || String(error), + ...buildAuthorityErrorDiagnostics(error), }); error.authorityDiagnostics = { operation: "bulkUpsert", @@ -824,6 +846,7 @@ export async function upsertAuthorityTriviumEntries(graph, config = {}, entries chunks, totalBytes, totalMs: roundMs(nowMs() - startedAt), + ...buildAuthorityErrorDiagnostics(error), }; throw error; } @@ -920,9 +943,19 @@ export async function searchAuthorityTriviumNodes(graph, text, config = {}, opti } export async function testAuthorityTriviumConnection(config = {}, options = {}) { - const probeVector = await embedText("test connection", config, { isQuery: true }); + const probeVectors = await embedBatch(AUTHORITY_CONNECTION_PROBE_TEXTS, config, { + isQuery: true, + }); + const probeVector = probeVectors.find((vector) => vector && vector.length > 0); if (!probeVector || probeVector.length === 0) { - return { success: false, dimensions: 0, error: "Embedding API 返回空结果" }; + return { + success: false, + dimensions: 0, + error: "Embedding API 批量返回空结果", + batchCapable: false, + mode: AUTHORITY_VECTOR_MODE, + source: AUTHORITY_VECTOR_SOURCE, + }; } const client = createAuthorityTriviumClient(config, options); await callClient(client, ["stat"], "stat", { @@ -930,5 +963,12 @@ export async function testAuthorityTriviumConnection(config = {}, options = {}) collectionId: options.collectionId, chatId: options.chatId, }); - return { success: true, dimensions: probeVector.length, error: "" }; + return { + success: true, + dimensions: probeVector.length, + error: "", + batchCapable: true, + mode: AUTHORITY_VECTOR_MODE, + source: AUTHORITY_VECTOR_SOURCE, + }; } diff --git a/vector/embedding.js b/vector/embedding.js index d90f2d7..f52ddfc 100644 --- a/vector/embedding.js +++ b/vector/embedding.js @@ -12,6 +12,8 @@ import { resolveConfiguredTimeoutMs } from "../runtime/request-timeout.js"; const MODULE_NAME = "st_bme"; const EMBEDDING_REQUEST_TIMEOUT_MS = 300000; +const DEFAULT_EMBEDDING_BATCH_SIZE = 10; +const MAX_EMBEDDING_BATCH_SIZE = 100; const BACKEND_SOURCES_REQUIRING_API_URL = new Set([ "ollama", "llamacpp", @@ -110,6 +112,94 @@ async function requestBackendEmbeddings(config = {}, payload = {}, { signal } = return await response.json().catch(() => ({})); } +function getEmbeddingBatchSize(config = {}) { + const parsed = Number(config?.embeddingBatchSize ?? config?.batchSize); + if (!Number.isFinite(parsed) || parsed <= 0) { + return DEFAULT_EMBEDDING_BATCH_SIZE; + } + return Math.min(MAX_EMBEDDING_BATCH_SIZE, Math.max(1, Math.trunc(parsed))); +} + +function chunkTexts(texts = [], size = DEFAULT_EMBEDDING_BATCH_SIZE) { + const chunks = []; + for (let start = 0; start < texts.length; start += size) { + chunks.push({ start, texts: texts.slice(start, start + size) }); + } + return chunks; +} + +async function requestDirectEmbeddingBatch(texts, config = {}, { signal } = {}) { + const apiUrl = normalizeOpenAICompatibleBaseUrl(config?.apiUrl); + const response = await fetchWithTimeout( + apiUrl + "/embeddings", + { + method: "POST", + headers: { + "Content-Type": "application/json", + ...(config.apiKey ? { Authorization: "Bearer " + config.apiKey } : {}), + }, + signal, + body: JSON.stringify({ + model: config.model, + input: texts, + }), + }, + getConfiguredTimeoutMs(config), + ); + + if (!response.ok) { + const errorText = await response.text().catch(() => response.statusText); + const error = new Error(errorText || response.statusText || "HTTP " + response.status); + error.status = response.status; + throw error; + } + + const data = await response.json().catch(() => ({})); + const embeddings = Array.isArray(data?.data) ? data.data : null; + if (!embeddings) { + throw new Error("Embedding API 返回格式异常"); + } + + const results = new Array(texts.length).fill(null); + embeddings.forEach((item, order) => { + const rawIndex = Number(item?.index); + const index = Number.isInteger(rawIndex) ? rawIndex : order; + if (index >= 0 && index < results.length) { + results[index] = normalizeVector(item?.embedding); + } + }); + return results; +} + +async function requestBackendEmbeddingBatch(texts, config = {}, { signal, isQuery = false } = {}) { + const payload = await requestBackendEmbeddings( + config, + { texts, isQuery }, + { signal }, + ); + const vectors = Array.isArray(payload?.vectors) ? payload.vectors : null; + if (!vectors) { + throw new Error("Backend Embedding API 返回格式异常"); + } + return texts.map((_, index) => normalizeVector(vectors[index])); +} + +async function fallbackEmbedChunkTexts(texts, config = {}, { signal, isQuery = false } = {}) { + const vectors = []; + for (const text of texts) { + try { + vectors.push(await embedText(text, config, { signal, isQuery })); + } catch (error) { + if (isAbortError(error)) { + throw error; + } + console.error("[ST-BME] Embedding 单条回退失败:", error); + vectors.push(null); + } + } + return vectors; +} + function createCombinedAbortSignal(...signals) { const validSignals = signals.filter(Boolean); if (validSignals.length <= 1) { @@ -264,91 +354,78 @@ export async function embedText(text, config, { signal, isQuery = false } = {}) * @returns {Promise<(Float64Array|null)[]>} */ export async function embedBatch(texts, config, { signal, isQuery = false } = {}) { + const normalizedTexts = Array.isArray(texts) + ? texts.map((item) => String(item ?? "")) + : []; const override = getEmbeddingTestOverride("embedBatch"); if (override) { - return await override(texts, config, { signal, isQuery }); + return await override(normalizedTexts, config, { signal, isQuery }); } - if (readEmbeddingMode(config) === "backend") { - if (!texts.length || !config?.model) { - return texts.map(() => null); - } - try { - const payload = await requestBackendEmbeddings( - config, - { texts, isQuery }, - { signal }, - ); - const vectors = Array.isArray(payload?.vectors) ? payload.vectors : []; - return texts.map((_, index) => normalizeVector(vectors[index])); - } catch (e) { - if (isAbortError(e)) { - throw e; - } - console.error("[ST-BME] Backend Embedding 批量调用失败:", e); - return texts.map(() => null); - } + if (!normalizedTexts.length) { + return []; } + const isBackend = readEmbeddingMode(config) === "backend"; const apiUrl = normalizeOpenAICompatibleBaseUrl(config?.apiUrl); - if (!texts.length || !apiUrl || !config?.model) { - return texts.map(() => null); + if (!config?.model || (!isBackend && !apiUrl)) { + return normalizedTexts.map(() => null); } - try { - const response = await fetchWithTimeout( - `${apiUrl}/embeddings`, - { - method: "POST", - headers: { - "Content-Type": "application/json", - ...(config.apiKey - ? { Authorization: `Bearer ${config.apiKey}` } - : {}), - }, - signal, - body: JSON.stringify({ - model: config.model, - input: texts, - }), - }, - getConfiguredTimeoutMs(config), - ); - - if (!response.ok) { - const errorText = await response.text(); - console.error( - `[ST-BME] Embedding API 批量错误 (${response.status}):`, - errorText, - ); - return texts.map(() => null); - } - - const data = await response.json(); - const embeddings = data?.data; - - if (!Array.isArray(embeddings)) { - return texts.map(() => null); - } - - // 按 index 排序(API 可能不保证顺序) - embeddings.sort((a, b) => a.index - b.index); - - return embeddings.map((item) => { - if (item?.embedding && Array.isArray(item.embedding)) { - return new Float64Array(item.embedding); + const results = new Array(normalizedTexts.length).fill(null); + const batchSize = getEmbeddingBatchSize(config); + for (const chunk of chunkTexts(normalizedTexts, batchSize)) { + let vectors = null; + try { + vectors = isBackend + ? await requestBackendEmbeddingBatch(chunk.texts, config, { signal, isQuery }) + : await requestDirectEmbeddingBatch(chunk.texts, config, { signal }); + } catch (error) { + if (isAbortError(error)) { + throw error; } - return null; - }); - } catch (e) { - if (isAbortError(e)) { - throw e; + console.error( + isBackend + ? "[ST-BME] Backend Embedding 批量调用失败:" + : "[ST-BME] Embedding API 批量调用失败:", + error, + ); } - console.error("[ST-BME] Embedding API 批量调用失败:", e); - return texts.map(() => null); - } -} + if (!vectors || vectors.length < chunk.texts.length) { + vectors = await fallbackEmbedChunkTexts(chunk.texts, config, { + signal, + isQuery, + }); + } else { + const missingIndexes = []; + for (let index = 0; index < chunk.texts.length; index++) { + if (!vectors[index]) { + missingIndexes.push(index); + } + } + if (missingIndexes.length > 0) { + const fallbackVectors = await fallbackEmbedChunkTexts( + missingIndexes.map((index) => chunk.texts[index]), + config, + { + signal, + isQuery, + }, + ); + missingIndexes.forEach((missingIndex, fallbackIndex) => { + vectors[missingIndex] = fallbackVectors[fallbackIndex] || null; + }); + } + } + + for (let index = 0; index < chunk.texts.length; index++) { + results[chunk.start + index] = vectors[index] || null; + } + } + + return results; +} /** * 计算两个向量的 cosine 相似度 * diff --git a/vector/vector-index.js b/vector/vector-index.js index bbccd90..dcd6d14 100644 --- a/vector/vector-index.js +++ b/vector/vector-index.js @@ -64,6 +64,8 @@ function getConfiguredTimeoutMs(config = {}) { })(); } +const VECTOR_CONNECTION_PROBE_TEXTS = ["test connection", "runtime batch probe"]; + const BACKEND_STATUS_MODEL_SOURCES = { openai: "openai", cohere: "cohere", @@ -626,6 +628,7 @@ function markAuthorityVectorStateDirty( config = {}, reason = "authority-trivium-failed", warning = "Authority Trivium 索引失败,已标记待重建", + diagnostics = {}, ) { if (!graph?.vectorIndexState || !isAuthorityVectorConfig(config)) { return; @@ -655,6 +658,39 @@ function markAuthorityVectorStateDirty( pending: total > 0 ? Math.max(1, Number(state.lastStats?.pending || 0)) : 0, }; state.lastWarning = String(warning || "Authority Trivium 索引失败,已标记待重建"); + const errorCategory = String(diagnostics.errorCategory || diagnostics.authorityErrorCategory || "").trim(); + const errorDomain = String(diagnostics.errorDomain || diagnostics.authorityErrorDomain || "").trim(); + if (errorCategory) state.lastErrorCategory = errorCategory; + if (errorDomain) state.lastErrorDomain = errorDomain; +} + +function getErrorCategory(error = null) { + return String(error?.category || error?.errorCategory || "").trim(); +} + +function getErrorDomain(error = null, fallback = "") { + if (!error) return ""; + if (error?.errorDomain) return String(error.errorDomain).trim(); + if (getErrorCategory(error)) return fallback || "authority"; + return fallback; +} + +function getAuthorityDiagnosticsErrorPatch(error = null) { + const errorCategory = getErrorCategory(error); + const errorDomain = getErrorDomain(error, errorCategory ? "authority" : ""); + return { + ...(errorCategory ? { errorCategory, authorityErrorCategory: errorCategory } : {}), + ...(errorDomain ? { errorDomain, authorityErrorDomain: errorDomain } : {}), + ...(Number(error?.status || 0) > 0 ? { status: Number(error.status) } : {}), + }; +} + +function createEmbeddingProviderError(failures = 0) { + const count = Math.max(0, Math.floor(Number(failures) || 0)); + const error = new Error(`Embedding provider failed for ${count} item(s)`); + error.errorCategory = "embedding-provider"; + error.errorDomain = "embedding"; + return error; } async function ensureEntryEmbeddings(graph, entries = [], config = {}, signal = undefined) { @@ -802,7 +838,7 @@ export async function syncGraphVectorIndex( embeddingsRequested += embeddingResult.requested; embedBatchMs += embeddingResult.elapsedMs; if (embeddingResult.failures > 0) { - throw new Error(`Authority Trivium embedding failed for ${embeddingResult.failures} item(s)`); + throw createEmbeddingProviderError(embeddingResult.failures); } const purgeStartedAt = nowMs(); const purgeResult = await purgeAuthorityTriviumNamespace(config, authorityOptions); @@ -866,7 +902,7 @@ export async function syncGraphVectorIndex( embeddingsRequested += embeddingResult.requested; embedBatchMs += embeddingResult.elapsedMs; if (embeddingResult.failures > 0) { - throw new Error(`Authority Trivium embedding failed for ${embeddingResult.failures} item(s)`); + throw createEmbeddingProviderError(embeddingResult.failures); } deletedNodeCount = nodeIdsToDelete.length; const deleteStartedAt = nowMs(); @@ -909,17 +945,29 @@ export async function syncGraphVectorIndex( } catch (error) { if (isAbortError(error)) throw error; const message = error?.message || String(error) || "Authority Trivium 同步失败"; + const errorCategory = getErrorCategory(error); + const errorDomain = getErrorDomain(error, errorCategory ? "authority" : ""); + const dirtyReason = errorDomain === "embedding" + ? "embedding-provider-sync-failed" + : "authority-trivium-sync-failed"; + const warningPrefix = errorDomain === "embedding" + ? "Embedding provider 同步失败" + : "Authority Trivium 同步失败"; markAuthorityVectorStateDirty( graph, config, - "authority-trivium-sync-failed", - `Authority Trivium 同步失败(${message}),已标记待重建`, + dirtyReason, + `${warningPrefix}(${message}),已标记待重建`, + { errorCategory, errorDomain }, ); state.lastSyncAt = Date.now(); state.lastTimings = { mode: syncMode, success: false, error: message, + ...(errorCategory ? { errorCategory } : {}), + ...(errorDomain ? { errorDomain } : {}), + ...(errorCategory && errorDomain === "authority" ? { authorityErrorCategory: errorCategory, authorityErrorDomain: errorDomain } : {}), desiredEntries: Number(desiredBuildDiagnostics.entryCount || desiredEntries.length), desiredBuildMs: roundMs(desiredBuildMs), authorityPurgeMs: roundMs(authorityPurgeMs), @@ -940,6 +988,8 @@ export async function syncGraphVectorIndex( stats: state.lastStats, timings: state.lastTimings, error: message, + ...(errorCategory ? { errorCategory } : {}), + ...(errorDomain ? { errorDomain } : {}), }; if (config.failOpen === false) { throw error; @@ -1291,17 +1341,20 @@ export async function findSimilarNodesByText( throw error; } const message = error?.message || String(error) || "Authority Trivium 查询失败"; + const errorPatch = getAuthorityDiagnosticsErrorPatch(error); markAuthorityVectorStateDirty( graph, config, "authority-trivium-query-failed", `Authority Trivium 查询失败(${message}),已标记待重建`, + errorPatch, ); recordSearchTimings({ success: false, reason: "authority-trivium-query-failed", requestMs: roundMs(nowMs() - requestStartedAt), error: message, + ...errorPatch, resultCount: 0, }); if (config.failOpen === false) { @@ -1406,15 +1459,69 @@ export async function testVectorConnection(config, chatId = "connection-test") { return { success: false, dimensions: 0, error: validation.error }; } - if (isDirectVectorConfig(config)) { + if (isDirectVectorConfig(config) || isBackendVectorConfig(config)) { try { - const vec = await embedText("test connection", config); - if (vec) { - return { success: true, dimensions: vec.length, error: "" }; + const vectors = await embedBatch(VECTOR_CONNECTION_PROBE_TEXTS, config, { + isQuery: true, + }); + const firstVector = vectors.find((vector) => vector && vector.length > 0); + if (firstVector) { + if (isBackendVectorConfig(config)) { + const response = await fetchWithTimeout( + "/api/vector/query", + { + method: "POST", + headers: getRequestHeaders(), + body: JSON.stringify({ + collectionId: buildVectorCollectionId(chatId), + searchText: VECTOR_CONNECTION_PROBE_TEXTS[0], + topK: 1, + threshold: 0, + ...buildBackendSourceRequest(config), + }), + }, + getConfiguredTimeoutMs(config), + ); + const payload = await response.text().catch(() => ""); + if (!response.ok) { + return { + success: false, + dimensions: firstVector.length, + error: payload || response.statusText, + batchCapable: true, + vectorStoreCapable: false, + mode: config.mode, + source: config.source || "backend", + }; + } + } + return { + success: true, + dimensions: firstVector.length, + error: "", + batchCapable: true, + vectorStoreCapable: isBackendVectorConfig(config) ? true : undefined, + mode: config.mode, + source: config.source || "direct", + }; } - return { success: false, dimensions: 0, error: "API 返回空结果" }; + return { + success: false, + dimensions: 0, + error: "批量 Embedding API 返回空结果", + batchCapable: false, + mode: config.mode, + source: config.source || "direct", + }; } catch (error) { - return { success: false, dimensions: 0, error: String(error) }; + return { + success: false, + dimensions: 0, + error: String(error), + batchCapable: false, + mode: config.mode, + source: config.source || "direct", + }; } } @@ -1429,38 +1536,8 @@ export async function testVectorConnection(config, chatId = "connection-test") { } } - try { - const response = await fetchWithTimeout( - "/api/vector/query", - { - method: "POST", - headers: getRequestHeaders(), - body: JSON.stringify({ - collectionId: buildVectorCollectionId(chatId), - searchText: "test connection", - topK: 1, - threshold: 0, - ...buildBackendSourceRequest(config), - }), - }, - getConfiguredTimeoutMs(config), - ); - - const payload = await response.text().catch(() => ""); - if (!response.ok) { - return { - success: false, - dimensions: 0, - error: payload || response.statusText, - }; - } - - return { success: true, dimensions: 0, error: "" }; - } catch (error) { - return { success: false, dimensions: 0, error: String(error) }; - } + return { success: false, dimensions: 0, error: "未知向量配置" }; } - export function getVectorIndexStats(graph) { const state = graph?.vectorIndexState; if (!state) {