diff --git a/index.js b/index.js index d0b5784..ce44e8e 100644 --- a/index.js +++ b/index.js @@ -288,6 +288,7 @@ import { rebindProcessedHistoryStateToChat, snapshotProcessedMessageHashes, undoLatestMaintenance, + buildVectorCollectionId, } from "./runtime/runtime-state.js"; import { DEFAULT_NODE_SCHEMA, validateSchema } from "./graph/schema.js"; import { @@ -358,6 +359,7 @@ import { fetchAvailableEmbeddingModels, getVectorConfigFromSettings, getVectorIndexStats, + getVectorModelScope, isAuthorityVectorConfig, isBackendVectorConfig, isDirectVectorConfig, @@ -366,6 +368,11 @@ import { testVectorConnection, validateVectorConfig, } from "./vector/vector-index.js"; +import { + buildAuthorityJobIdempotencyKey, + createAuthorityJobAdapter, + normalizeAuthorityJobConfig, +} from "./maintenance/authority-job-adapter.js"; export { DEFAULT_TRIGGER_KEYWORDS, getSmartTriggerDecision }; @@ -1497,6 +1504,7 @@ function buildAuthorityPersistenceStatePatch(settings = getSettings()) { authorityServerPrimaryReady: Boolean(capability.serverPrimaryReady), authorityStoragePrimaryReady: Boolean(capability.storagePrimaryReady), authorityTriviumPrimaryReady: Boolean(capability.triviumPrimaryReady), + authorityJobsReady: Boolean(capability.jobsReady), authorityBrowserCacheMode: String(browserState.mode || "minimal"), authorityOfflineQueueBytes: Number(browserState.offlineQueueBytes || 0), authorityOfflineQueueItems: Number(browserState.offlineQueueItems || 0), @@ -1903,6 +1911,162 @@ function updateGraphPersistenceState(patch = {}) { return graphPersistenceState; } +function getAuthorityJobAdapter(options = {}) { + const settings = getSettings(); + const config = normalizeAuthorityJobConfig(settings); + return createAuthorityJobAdapter(config, { + fetchImpl: globalThis.fetch?.bind(globalThis), + headerProvider: + typeof getRequestHeaders === "function" ? () => getRequestHeaders() : null, + ...options, + }); +} + +function shouldUseAuthorityJobs(config = null) { + const settings = getSettings(); + const { capability } = getAuthorityRuntimeSnapshot(settings); + const jobConfig = normalizeAuthorityJobConfig(settings); + return Boolean( + jobConfig.enabled && + capability.jobsReady && + settings.authorityJobsEnabled !== false && + isAuthorityVectorConfig(config), + ); +} + +function recordAuthorityJobSnapshot(job = null, options = {}) { + const normalizedJob = + job && typeof job === "object" && !Array.isArray(job) ? job : {}; + const progress = Number(normalizedJob.progress || 0); + const status = String(normalizedJob.status || options.status || ""); + const error = String(normalizedJob.error || options.error || ""); + const queueState = + options.queueState || + (error + ? "error" + : normalizedJob.terminal + ? normalizedJob.success + ? "success" + : "failed" + : normalizedJob.id + ? "running" + : "idle"); + updateGraphPersistenceState({ + authorityJobQueueState: queueState, + authorityLastJob: cloneRuntimeDebugValue(normalizedJob, null), + authorityLastJobId: String(normalizedJob.id || options.jobId || ""), + authorityLastJobKind: String(normalizedJob.kind || options.kind || ""), + authorityLastJobStatus: status, + authorityLastJobProgress: Number.isFinite(progress) + ? Math.max(0, Math.min(1, progress)) + : 0, + authorityLastJobError: error, + authorityLastJobUpdatedAt: new Date().toISOString(), + }); +} + +async function submitAuthorityVectorRebuildJob({ + config = null, + range = null, + purge = true, + signal = undefined, +} = {}) { + const vectorConfig = config || getEmbeddingConfig(); + if (!shouldUseAuthorityJobs(vectorConfig)) { + return { + submitted: false, + fallbackRequired: true, + reason: "authority-jobs-unavailable", + }; + } + + ensureCurrentGraphRuntimeState(); + const chatId = getCurrentChatId(); + const collectionId = + currentGraph?.vectorIndexState?.collectionId || buildVectorCollectionId(chatId); + const kind = range + ? "authority.vector.rebuild-range" + : "authority.vector.rebuild"; + const idempotencyKey = buildAuthorityJobIdempotencyKey({ + kind, + chatId, + collectionId, + revision: + currentGraph?.meta?.revision || + currentGraph?.historyState?.extractionCount || + graphPersistenceState?.revision || + 0, + range, + }); + const payload = { + chatId, + collectionId, + namespace: collectionId, + modelScope: getVectorModelScope(vectorConfig), + source: "authority-trivium", + purge: Boolean(purge), + range: range || null, + graphRevision: + currentGraph?.meta?.revision || graphPersistenceState?.revision || 0, + idempotencyKey, + }; + + try { + const adapter = getAuthorityJobAdapter(); + const job = await adapter.submit(kind, payload, { + idempotencyKey, + signal, + }); + recordAuthorityJobSnapshot(job, { kind, queueState: "running" }); + if (currentGraph?.vectorIndexState) { + currentGraph.vectorIndexState.dirty = true; + currentGraph.vectorIndexState.dirtyReason = "authority-vector-rebuild-job-submitted"; + currentGraph.vectorIndexState.lastWarning = + "Authority 向量重建 Job 已提交,等待服务端完成"; + currentGraph.vectorIndexState.lastRebuildJob = + cloneRuntimeDebugValue(job, null); + } + setLastVectorStatus( + "向量重建 Job 已提交", + `${kind} · ${job.id || "pending"} · ${job.status || "queued"}`, + "running", + { syncRuntime: true }, + ); + return { + submitted: true, + fallbackRequired: false, + job, + stats: getVectorIndexStats(currentGraph), + insertedHashes: [], + }; + } catch (error) { + const message = error?.message || String(error) || "Authority Job 提交失败"; + recordAuthorityJobSnapshot(null, { + kind, + queueState: "fallback", + error: message, + }); + return { + submitted: false, + fallbackRequired: true, + error: message, + }; + } +} + +async function requeueAuthorityJob(jobId, options = {}) { + try { + const adapter = getAuthorityJobAdapter(); + const job = await adapter.requeue(jobId, options); + recordAuthorityJobSnapshot(job, { queueState: "running" }); + return { success: true, job }; + } catch (error) { + const message = error?.message || String(error) || "Authority Job 重试失败"; + recordAuthorityJobSnapshot(null, { queueState: "error", error: message }); + return { success: false, error: message }; + } +} + function recordIgnoredMutationEvent(eventName = "", detail = {}) { updateGraphPersistenceState({ lastIgnoredMutationEvent: String(eventName || ""), @@ -20104,6 +20268,8 @@ async function onRebuildVectorIndex(range = null) { isBackendVectorConfig, refreshPanelLiveState, saveGraphToChat, + shouldUseAuthorityJobs, + submitAuthorityVectorRebuildJob, syncVectorState, toastr, validateVectorConfig, @@ -20683,6 +20849,7 @@ async function onCompactLukerSidecar() { updateRegionAdjacency: onUpdatePanelRegionAdjacency, rebuildVectorIndex: () => onRebuildVectorIndex(), rebuildVectorRange: (range) => onRebuildVectorIndex(range), + requeueAuthorityJob: async (jobId) => await requeueAuthorityJob(jobId), reembedDirect: onReembedDirect, reroll: onReroll, clearGraph: onClearGraph, diff --git a/maintenance/authority-job-adapter.js b/maintenance/authority-job-adapter.js new file mode 100644 index 0000000..ef73bda --- /dev/null +++ b/maintenance/authority-job-adapter.js @@ -0,0 +1,335 @@ +import { normalizeAuthorityBaseUrl } from "../runtime/authority-capabilities.js"; + +export const AUTHORITY_JOB_ENDPOINT = "/v1/jobs"; +export const AUTHORITY_JOB_STATUS_TERMINAL = new Set([ + "completed", + "succeeded", + "success", + "failed", + "error", + "cancelled", + "canceled", + "timeout", +]); +export const AUTHORITY_JOB_STATUS_SUCCESS = new Set([ + "completed", + "succeeded", + "success", +]); + +function toPlainData(value, fallbackValue = null) { + if (value == null) return fallbackValue; + if (typeof globalThis.structuredClone === "function") { + try { + return globalThis.structuredClone(value); + } catch { + } + } + try { + return JSON.parse(JSON.stringify(value)); + } catch { + return fallbackValue; + } +} + +function normalizeRecordId(value) { + return String(value ?? "").trim(); +} + +function normalizeInteger(value, fallback = 0, min = 0, max = Number.MAX_SAFE_INTEGER) { + const parsed = Number(value); + if (!Number.isFinite(parsed)) return fallback; + return Math.min(max, Math.max(min, Math.trunc(parsed))); +} + +function normalizeProgress(value = null) { + if (typeof value === "number") { + return Math.max(0, Math.min(1, value > 1 ? value / 100 : value)); + } + if (!value || typeof value !== "object" || Array.isArray(value)) return 0; + const direct = Number(value.progress ?? value.ratio ?? value.percent); + if (Number.isFinite(direct)) { + return Math.max(0, Math.min(1, direct > 1 ? direct / 100 : direct)); + } + const current = Number(value.current ?? value.done ?? value.completed ?? value.processed); + const total = Number(value.total ?? value.count ?? value.expected); + if (Number.isFinite(current) && Number.isFinite(total) && total > 0) { + return Math.max(0, Math.min(1, current / total)); + } + return 0; +} + +function normalizeJobStatus(value = "queued") { + return String(value || "queued").trim().toLowerCase() || "queued"; +} + +function readJobRows(payload = null) { + if (Array.isArray(payload)) return payload; + if (!payload || typeof payload !== "object") return []; + if (Array.isArray(payload.jobs)) return payload.jobs; + if (Array.isArray(payload.items)) return payload.items; + if (Array.isArray(payload.rows)) return payload.rows; + if (Array.isArray(payload.data)) return payload.data; + if (Array.isArray(payload.result?.jobs)) return payload.result.jobs; + if (Array.isArray(payload.result?.items)) return payload.result.items; + return []; +} + +export function normalizeAuthorityJobRecord(input = null) { + const source = input && typeof input === "object" && !Array.isArray(input) ? input : {}; + const id = normalizeRecordId(source.id || source.jobId || source.job_id || source.key); + const status = normalizeJobStatus(source.status || source.state || source.phase); + const progress = normalizeProgress(source.progress ?? source.progressRatio ?? source.percent); + const kind = String(source.kind || source.type || source.name || "").trim(); + const lastEvent = source.lastEvent && typeof source.lastEvent === "object" + ? toPlainData(source.lastEvent, null) + : source.event && typeof source.event === "object" + ? toPlainData(source.event, null) + : null; + return { + id, + kind, + status, + progress, + terminal: AUTHORITY_JOB_STATUS_TERMINAL.has(status), + success: AUTHORITY_JOB_STATUS_SUCCESS.has(status), + error: String(source.error || source.lastError || source.message || ""), + idempotencyKey: String(source.idempotencyKey || source.idempotency_key || ""), + queue: String(source.queue || source.worker || ""), + createdAt: source.createdAt || source.created_at || source.enqueuedAt || "", + updatedAt: source.updatedAt || source.updated_at || source.finishedAt || "", + lastEvent, + raw: toPlainData(source, source), + }; +} + +export function normalizeAuthorityJobList(payload = null) { + const source = payload && typeof payload === "object" && !Array.isArray(payload) ? payload : {}; + const jobs = readJobRows(payload).map(normalizeAuthorityJobRecord).filter((job) => job.id); + return { + jobs, + nextCursor: String(source.nextCursor || source.next_cursor || source.cursor?.next || ""), + hasMore: Boolean(source.hasMore || source.has_more || source.cursor?.hasMore), + raw: toPlainData(payload, payload), + }; +} + +export function buildAuthorityJobIdempotencyKey({ + kind = "job", + chatId = "", + collectionId = "", + revision = 0, + range = null, + suffix = "", +} = {}) { + const normalizedRange = range && Number.isFinite(Number(range.start)) && Number.isFinite(Number(range.end)) + ? `${Math.min(Number(range.start), Number(range.end))}-${Math.max(Number(range.start), Number(range.end))}` + : "all"; + return [ + "st-bme", + normalizeRecordId(kind) || "job", + normalizeRecordId(chatId) || "unknown-chat", + normalizeRecordId(collectionId) || "unknown-collection", + String(Math.max(0, Math.floor(Number(revision) || 0))), + normalizedRange, + normalizeRecordId(suffix), + ].filter(Boolean).join(":"); +} + +export function normalizeAuthorityJobConfig(settings = {}, overrides = {}) { + const source = settings && typeof settings === "object" && !Array.isArray(settings) ? settings : {}; + return { + baseUrl: normalizeAuthorityBaseUrl(source.authorityBaseUrl ?? source.baseUrl), + enabled: source.authorityJobsEnabled !== false && source.jobsEnabled !== false, + failOpen: source.authorityFailOpen !== false && source.failOpen !== false, + pollIntervalMs: normalizeInteger(source.authorityJobPollIntervalMs ?? source.pollIntervalMs, 1200, 250, 30000), + waitTimeoutMs: normalizeInteger(source.authorityJobWaitTimeoutMs ?? source.waitTimeoutMs, 0, 0, 3600000), + ...overrides, + }; +} + +export class AuthorityJobHttpClient { + constructor(options = {}) { + this.baseUrl = normalizeAuthorityBaseUrl(options.baseUrl); + this.fetchImpl = options.fetchImpl || (typeof fetch === "function" ? fetch.bind(globalThis) : null); + this.headerProvider = typeof options.headerProvider === "function" ? options.headerProvider : null; + } + + async request(action, payload = {}) { + if (typeof this.fetchImpl !== "function") { + throw new Error("Authority Jobs fetch unavailable"); + } + const response = await this.fetchImpl(`${this.baseUrl}${AUTHORITY_JOB_ENDPOINT}`, { + method: "POST", + headers: { + Accept: "application/json", + "Content-Type": "application/json", + ...(this.headerProvider ? this.headerProvider() || {} : {}), + }, + body: JSON.stringify({ action, ...payload }), + }); + if (!response?.ok) { + const text = await response?.text?.().catch(() => ""); + throw new Error(text || `Authority Jobs HTTP ${response?.status || "unknown"}`); + } + return await response.json().catch(() => ({})); + } + + async submit(payload = {}) { + return await this.request("submit", payload); + } + + async listPage(payload = {}) { + return await this.request("listPage", payload); + } + + async waitForCompletion(payload = {}) { + return await this.request("waitForCompletion", payload); + } + + async requeue(payload = {}) { + return await this.request("requeue", payload); + } + + async cancel(payload = {}) { + return await this.request("cancel", payload); + } +} + +export function createAuthorityJobClient(config = {}, options = {}) { + const injected = options.jobClient || config.jobClient || globalThis.__stBmeAuthorityJobClient; + if (injected) return injected; + return new AuthorityJobHttpClient({ + baseUrl: config.baseUrl, + fetchImpl: options.fetchImpl || config.fetchImpl, + headerProvider: options.headerProvider || config.headerProvider, + }); +} + +async function callClient(client, methodNames = [], action = "request", payload = {}) { + for (const methodName of methodNames) { + if (typeof client?.[methodName] === "function") { + return await client[methodName](payload); + } + } + if (typeof client?.request === "function") { + return await client.request(action, payload); + } + if (typeof client === "function") { + return await client({ action, ...payload }); + } + throw new Error(`Authority Jobs ${action} unavailable`); +} + +function throwIfAborted(signal) { + if (signal?.aborted) { + throw signal.reason instanceof Error + ? signal.reason + : Object.assign(new Error("操作已终止"), { name: "AbortError" }); + } +} + +function sleep(ms, signal) { + if (!Number.isFinite(Number(ms)) || Number(ms) <= 0) return Promise.resolve(); + return new Promise((resolve, reject) => { + const timer = setTimeout(resolve, Math.max(0, Math.floor(Number(ms)))); + if (signal) { + signal.addEventListener( + "abort", + () => { + clearTimeout(timer); + reject(signal.reason instanceof Error ? signal.reason : Object.assign(new Error("操作已终止"), { name: "AbortError" })); + }, + { once: true }, + ); + } + }); +} + +export class AuthorityJobAdapter { + constructor(config = {}, options = {}) { + this.config = normalizeAuthorityJobConfig(config, options.configOverrides || {}); + this.client = createAuthorityJobClient(this.config, options); + } + + async submit(kind, payload = {}, options = {}) { + throwIfAborted(options.signal); + const result = await callClient(this.client, ["submit", "enqueue"], "submit", { + kind, + type: kind, + idempotencyKey: options.idempotencyKey || payload.idempotencyKey || "", + payload, + }); + return normalizeAuthorityJobRecord(result?.job || result?.result || result); + } + + async listPage(options = {}) { + throwIfAborted(options.signal); + const result = await callClient(this.client, ["listPage", "list"], "listPage", { + cursor: options.cursor || "", + limit: normalizeInteger(options.limit, 20, 1, 100), + filter: options.filter || {}, + }); + return normalizeAuthorityJobList(result); + } + + async get(jobId, options = {}) { + throwIfAborted(options.signal); + const id = normalizeRecordId(jobId); + if (!id) return normalizeAuthorityJobRecord(null); + const result = await callClient(this.client, ["get", "status"], "get", { jobId: id, id }); + return normalizeAuthorityJobRecord(result?.job || result?.result || result); + } + + async waitForCompletion(jobId, options = {}) { + throwIfAborted(options.signal); + const id = normalizeRecordId(jobId); + if (!id) return normalizeAuthorityJobRecord(null); + if (typeof this.client?.waitForCompletion === "function") { + const result = await this.client.waitForCompletion({ + jobId: id, + id, + timeoutMs: normalizeInteger(options.timeoutMs, this.config.waitTimeoutMs, 0, 3600000), + }); + return normalizeAuthorityJobRecord(result?.job || result?.result || result); + } + + const startedAt = Date.now(); + const timeoutMs = normalizeInteger(options.timeoutMs, this.config.waitTimeoutMs, 0, 3600000); + const pollIntervalMs = normalizeInteger(options.pollIntervalMs, this.config.pollIntervalMs, 250, 30000); + while (true) { + throwIfAborted(options.signal); + const job = await this.get(id, options); + if (job.terminal) return job; + if (timeoutMs > 0 && Date.now() - startedAt >= timeoutMs) { + return { ...job, status: "timeout", terminal: true, success: false, error: "wait timeout" }; + } + await sleep(pollIntervalMs, options.signal); + } + } + + async requeue(jobId, options = {}) { + throwIfAborted(options.signal); + const id = normalizeRecordId(jobId); + const result = await callClient(this.client, ["requeue", "safeRequeue"], "requeue", { + jobId: id, + id, + safe: options.safe !== false, + }); + return normalizeAuthorityJobRecord(result?.job || result?.result || result); + } + + async cancel(jobId, options = {}) { + throwIfAborted(options.signal); + const id = normalizeRecordId(jobId); + const result = await callClient(this.client, ["cancel", "cancelLike"], "cancel", { + jobId: id, + id, + }); + return normalizeAuthorityJobRecord(result?.job || result?.result || result); + } +} + +export function createAuthorityJobAdapter(config = {}, options = {}) { + return new AuthorityJobAdapter(config, options); +} diff --git a/tests/authority-jobs.mjs b/tests/authority-jobs.mjs new file mode 100644 index 0000000..d55cd12 --- /dev/null +++ b/tests/authority-jobs.mjs @@ -0,0 +1,229 @@ +import assert from "node:assert/strict"; +import { + AUTHORITY_JOB_STATUS_TERMINAL, + AUTHORITY_JOB_STATUS_SUCCESS, + buildAuthorityJobIdempotencyKey, + createAuthorityJobAdapter, + normalizeAuthorityJobList, + normalizeAuthorityJobRecord, +} from "../maintenance/authority-job-adapter.js"; +import { onRebuildVectorIndexController } from "../ui/ui-actions-controller.js"; + +function createMockJobClient() { + const calls = []; + const states = new Map(); + return { + calls, + async submit(payload) { + calls.push(["submit", payload]); + const id = `job-${calls.length}`; + const job = { + id, + kind: payload.kind, + status: "queued", + progress: { current: 0, total: 10 }, + idempotencyKey: payload.idempotencyKey, + }; + states.set(id, job); + return { job }; + }, + async get(payload) { + calls.push(["get", payload]); + const previous = states.get(payload.jobId || payload.id); + const next = { + ...previous, + status: "completed", + progress: 1, + }; + states.set(next.id, next); + return { job: next }; + }, + async listPage(payload) { + calls.push(["listPage", payload]); + return { + jobs: Array.from(states.values()), + nextCursor: "next-1", + hasMore: true, + }; + }, + async requeue(payload) { + calls.push(["requeue", payload]); + return { job: { id: payload.jobId, status: "queued", progress: 0 } }; + }, + }; +} + +const normalized = normalizeAuthorityJobRecord({ + job_id: "job-a", + type: "authority.vector.rebuild", + state: "succeeded", + progress: { current: 5, total: 10 }, + idempotency_key: "idem-a", +}); +assert.equal(normalized.id, "job-a"); +assert.equal(normalized.kind, "authority.vector.rebuild"); +assert.equal(normalized.status, "succeeded"); +assert.equal(normalized.progress, 0.5); +assert.equal(normalized.terminal, true); +assert.equal(normalized.success, true); +assert.equal(AUTHORITY_JOB_STATUS_TERMINAL.has("failed"), true); +assert.equal(AUTHORITY_JOB_STATUS_SUCCESS.has("completed"), true); + +const list = normalizeAuthorityJobList({ + items: [{ id: "job-list", status: "running", progress: 33 }], + next_cursor: "cursor-2", +}); +assert.equal(list.jobs.length, 1); +assert.equal(list.jobs[0].progress, 0.33); +assert.equal(list.nextCursor, "cursor-2"); + +const idempotencyKey = buildAuthorityJobIdempotencyKey({ + kind: "authority.vector.rebuild", + chatId: "chat-a", + collectionId: "st-bme::chat-a", + revision: 7, + range: { start: 5, end: 1 }, +}); +assert.equal( + idempotencyKey, + "st-bme:authority.vector.rebuild:chat-a:st-bme::chat-a:7:1-5", +); + +const client = createMockJobClient(); +const adapter = createAuthorityJobAdapter( + { authorityBaseUrl: "/api/plugins/authority" }, + { jobClient: client }, +); +const submitted = await adapter.submit( + "authority.vector.rebuild", + { chatId: "chat-a" }, + { idempotencyKey }, +); +assert.equal(submitted.id, "job-1"); +assert.equal(submitted.idempotencyKey, idempotencyKey); + +const completed = await adapter.waitForCompletion(submitted.id, { timeoutMs: 1000 }); +assert.equal(completed.status, "completed"); +assert.equal(completed.success, true); + +const page = await adapter.listPage({ limit: 10 }); +assert.equal(page.jobs.length, 1); +assert.equal(page.nextCursor, "next-1"); +assert.equal(page.hasMore, true); + +const requeued = await adapter.requeue(submitted.id); +assert.equal(requeued.status, "queued"); +assert.deepEqual(client.calls.map(([name]) => name), [ + "submit", + "get", + "listPage", + "requeue", +]); + +function createVectorControllerRuntime(overrides = {}) { + const calls = []; + const signal = {}; + const runtime = { + calls, + beginStageAbortController(stage) { + calls.push(["beginStageAbortController", stage]); + return { signal }; + }, + ensureCurrentGraphRuntimeState() { + calls.push(["ensureCurrentGraphRuntimeState"]); + }, + ensureGraphMutationReady(label) { + calls.push(["ensureGraphMutationReady", label]); + return true; + }, + finishStageAbortController(stage, controller) { + calls.push(["finishStageAbortController", stage, controller === null ? null : Boolean(controller)]); + }, + getEmbeddingConfig() { + return { mode: "authority", source: "authority-trivium" }; + }, + isAuthorityVectorConfig(config) { + return config?.mode === "authority"; + }, + isBackendVectorConfig(config) { + return config?.mode === "backend"; + }, + refreshPanelLiveState() { + calls.push(["refreshPanelLiveState"]); + }, + saveGraphToChat(payload) { + calls.push(["saveGraphToChat", payload]); + }, + shouldUseAuthorityJobs() { + calls.push(["shouldUseAuthorityJobs"]); + return true; + }, + async submitAuthorityVectorRebuildJob(payload) { + calls.push(["submitAuthorityVectorRebuildJob", payload]); + return { submitted: true, job: { id: "job-vector", status: "queued" } }; + }, + async syncVectorState(payload) { + calls.push(["syncVectorState", payload]); + return { insertedHashes: [], stats: { indexed: 2, pending: 0 } }; + }, + toastr: { + info(message) { + calls.push(["toastr.info", message]); + }, + success(message) { + calls.push(["toastr.success", message]); + }, + warning(message) { + calls.push(["toastr.warning", message]); + }, + }, + validateVectorConfig() { + calls.push(["validateVectorConfig"]); + return { valid: true }; + }, + ...overrides, + }; + return runtime; +} + +const jobControllerRuntime = createVectorControllerRuntime(); +await onRebuildVectorIndexController(jobControllerRuntime); +assert.equal( + jobControllerRuntime.calls.some(([name]) => name === "submitAuthorityVectorRebuildJob"), + true, +); +assert.equal( + jobControllerRuntime.calls.some(([name]) => name === "syncVectorState"), + false, +); +assert.deepEqual( + jobControllerRuntime.calls.find(([name]) => name === "saveGraphToChat")?.[1], + { reason: "authority-vector-rebuild-job-submitted" }, +); + +const fallbackRuntime = createVectorControllerRuntime({ + async submitAuthorityVectorRebuildJob(payload) { + this.calls.push(["submitAuthorityVectorRebuildJob", payload]); + return { submitted: false, error: "job offline" }; + }, +}); +await onRebuildVectorIndexController(fallbackRuntime); +const fallbackSync = fallbackRuntime.calls.find(([name]) => name === "syncVectorState"); +assert.equal(fallbackSync?.[1]?.purge, true); +assert.equal( + fallbackRuntime.calls.some(([name]) => name === "toastr.warning"), + true, +); + +const range = { start: 1, end: 2 }; +const rangeRuntime = createVectorControllerRuntime(); +await onRebuildVectorIndexController(rangeRuntime, range); +assert.equal( + rangeRuntime.calls.some(([name]) => name === "submitAuthorityVectorRebuildJob"), + false, +); +const rangeSync = rangeRuntime.calls.find(([name]) => name === "syncVectorState"); +assert.equal(rangeSync?.[1]?.purge, false); +assert.equal(rangeSync?.[1]?.range, range); + +console.log("authority-jobs tests passed"); diff --git a/ui/panel.js b/ui/panel.js index 40eb457..ca5835d 100644 --- a/ui/panel.js +++ b/ui/panel.js @@ -1975,6 +1975,23 @@ function _refreshTaskPipelineOverview() { if (pipelinePersistDeltaMeta) { persistenceMetaParts.push(pipelinePersistDeltaMeta); } + const authorityJob = loadInfo.authorityLastJob || {}; + const authorityJobParts = [ + authorityJob.id || loadInfo.authorityLastJobId + ? `job ${authorityJob.id || loadInfo.authorityLastJobId}` + : "", + authorityJob.kind || loadInfo.authorityLastJobKind || "", + authorityJob.status || loadInfo.authorityLastJobStatus || "", + ].filter(Boolean); + const authorityJobProgress = Number( + authorityJob.progress ?? loadInfo.authorityLastJobProgress, + ); + if (Number.isFinite(authorityJobProgress) && authorityJobProgress > 0) { + authorityJobParts.push(`${Math.round(authorityJobProgress * 100)}%`); + } + if (loadInfo.authorityLastJobError) { + authorityJobParts.push(`error ${loadInfo.authorityLastJobError}`); + } const persistence = _resolvePipelineStatus({ text: loadInfo.loadState || "unknown", meta: persistenceMetaParts.join(" · "), @@ -2026,7 +2043,16 @@ function _refreshTaskPipelineOverview() { const statusRows = [ { label: "提取", color: extraction.color, value: extraction.label + (extraction.detail ? ` — ${extraction.detail}` : "") }, - { label: "向量", color: vector.color, value: vector.label + (vector.detail ? ` — ${vector.detail}` : "") }, + { + label: "向量", + color: vector.color, + value: + vector.label + + (vector.detail ? ` — ${vector.detail}` : "") + + (authorityJobParts.length + ? ` · Authority ${authorityJobParts.join(" · ")}` + : ""), + }, { label: "召回", color: recall.color, value: recall.label + (recall.detail ? ` — ${recall.detail}` : "") }, { label: "持久化", color: persistence.color, value: persistence.label + (persistence.detail ? ` — ${persistence.detail}` : "") }, ]; diff --git a/ui/ui-actions-controller.js b/ui/ui-actions-controller.js index 9a47c59..2b989ad 100644 --- a/ui/ui-actions-controller.js +++ b/ui/ui-actions-controller.js @@ -587,6 +587,32 @@ export async function onRebuildVectorIndexController(runtime, range = null) { const vectorController = runtime.beginStageAbortController("vector"); try { + if ( + !range && + typeof runtime.shouldUseAuthorityJobs === "function" && + runtime.shouldUseAuthorityJobs(config) && + typeof runtime.submitAuthorityVectorRebuildJob === "function" + ) { + const jobResult = await runtime.submitAuthorityVectorRebuildJob({ + config, + purge: true, + range, + signal: vectorController.signal, + }); + if (jobResult?.submitted) { + runtime.saveGraphToChat({ reason: "authority-vector-rebuild-job-submitted" }); + runtime.toastr.info( + `Authority 向量重建任务已提交:${jobResult.job?.id || "pending"}`, + ); + return; + } + if (jobResult?.error) { + runtime.toastr.warning( + `Authority Job 提交失败,已回退本地重建:${jobResult.error}`, + ); + } + } + const result = await runtime.syncVectorState({ force: true, purge: diff --git a/ui/ui-status.js b/ui/ui-status.js index 2264b8c..6f94c43 100644 --- a/ui/ui-status.js +++ b/ui/ui-status.js @@ -146,6 +146,15 @@ export function createGraphPersistenceState() { authorityMigrationRevision: 0, authorityMigrationLastError: "", lastAuthorityMigrationResult: null, + authorityJobsReady: false, + authorityJobQueueState: "idle", + authorityLastJob: null, + authorityLastJobId: "", + authorityLastJobKind: "", + authorityLastJobStatus: "", + authorityLastJobProgress: 0, + authorityLastJobError: "", + authorityLastJobUpdatedAt: "", localStoreFormatVersion: 1, localStoreMigrationState: "idle", opfsWriteLockState: {