fix(authority): gate vector rebuild jobs

Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-openagent)

Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai>
This commit is contained in:
Sisyphus
2026-05-03 19:27:47 +00:00
parent 8b65fcbdb1
commit fecbd1f2a6
4 changed files with 167 additions and 41 deletions

View File

@@ -418,6 +418,8 @@ let _themesModule = null;
const SERVER_SETTINGS_FILENAME = "st-bme-settings.json";
const SERVER_SETTINGS_URL = `/user/files/${SERVER_SETTINGS_FILENAME}`;
const AUTHORITY_VECTOR_REBUILD_JOB_TYPE = "authority.vector.rebuild";
const AUTHORITY_VECTOR_REBUILD_RANGE_JOB_TYPE = "authority.vector.rebuild-range";
function normalizeChatIdCandidate(value = "") {
return String(value ?? "").trim();
@@ -2150,16 +2152,30 @@ function getAuthorityJobAdapter(options = {}) {
});
}
function shouldUseAuthorityJobs(config = null) {
function normalizeAuthorityJobType(kind = "") {
return String(kind || "").trim().toLowerCase();
}
function shouldUseAuthorityJobs(config = null, kind = AUTHORITY_VECTOR_REBUILD_JOB_TYPE) {
const settings = getSettings();
const { capability } = getAuthorityRuntimeSnapshot(settings);
if (
!capability.jobsReady ||
settings.authorityJobsEnabled === false ||
!isAuthorityJobTypeSupported(capability, kind) ||
!isAuthorityVectorConfig(config)
) {
return false;
}
const jobConfig = normalizeAuthorityJobConfig(settings);
return Boolean(
jobConfig.enabled &&
capability.jobsReady &&
settings.authorityJobsEnabled !== false &&
isAuthorityVectorConfig(config),
);
return Boolean(jobConfig.enabled);
}
function isAuthorityJobTypeSupported(capability = {}, kind = "") {
if (!capability?.supportedJobTypesKnown) return true;
const normalizedKind = normalizeAuthorityJobType(kind);
if (!normalizedKind) return true;
return Array.isArray(capability.supportedJobTypes) && capability.supportedJobTypes.includes(normalizedKind);
}
function mergeAuthorityRecentJobsIntoState(incomingJobs = [], options = {}) {
@@ -3329,7 +3345,7 @@ async function rebuildAuthorityTrivium(options = {}) {
const range = options.range || null;
const reason = String(options.reason || "authority-trivium-rebuild");
if (!range && options.useJobs !== false && shouldUseAuthorityJobs(vectorConfig)) {
if (!range && options.useJobs !== false && shouldUseAuthorityJobs(vectorConfig, AUTHORITY_VECTOR_REBUILD_JOB_TYPE)) {
const jobResult = await submitAuthorityVectorRebuildJob({
config: vectorConfig,
range,
@@ -3672,7 +3688,20 @@ async function submitAuthorityVectorRebuildJob({
signal = undefined,
} = {}) {
const vectorConfig = config || getEmbeddingConfig();
if (!shouldUseAuthorityJobs(vectorConfig)) {
const kind = range
? AUTHORITY_VECTOR_REBUILD_RANGE_JOB_TYPE
: AUTHORITY_VECTOR_REBUILD_JOB_TYPE;
const { capability } = getAuthorityRuntimeSnapshot(getSettings());
if (!shouldUseAuthorityJobs(vectorConfig, kind)) {
if (!isAuthorityJobTypeSupported(capability, kind)) {
const message = `Authority Job type ${kind} is not supported by this Authority runtime`;
return {
submitted: false,
fallbackRequired: true,
reason: "authority-job-type-unsupported",
error: message,
};
}
return {
submitted: false,
fallbackRequired: true,
@@ -3684,9 +3713,6 @@ async function submitAuthorityVectorRebuildJob({
const chatId = getCurrentChatId();
const collectionId =
currentGraph?.vectorIndexState?.collectionId || buildVectorCollectionId(chatId);
const kind = range
? "authority.vector.rebuild-range"
: "authority.vector.rebuild";
const idempotencyKey = buildAuthorityJobIdempotencyKey({
kind,
chatId,
@@ -11513,15 +11539,13 @@ async function maybeMigrateLegacyGraphToIndexedDb(
}
try {
const settings = getSettings();
if (shouldUseAuthorityJobs(settings)) {
const vectorConfig = normalizeAuthorityVectorConfig(settings, buildAuthorityGraphStoreOptions(settings));
if (vectorConfig?.mode === "authority") {
await submitAuthorityVectorRebuildJob({
config: vectorConfig,
purge: true,
reason: "authority-migration-trivium-rebuild",
});
}
const vectorConfig = normalizeAuthorityVectorConfig(settings, buildAuthorityGraphStoreOptions(settings));
if (shouldUseAuthorityJobs(vectorConfig, AUTHORITY_VECTOR_REBUILD_JOB_TYPE)) {
await submitAuthorityVectorRebuildJob({
config: vectorConfig,
purge: true,
reason: "authority-migration-trivium-rebuild",
});
}
} catch (vectorJobError) {
console.warn("[ST-BME] 迁移后触发 Trivium 重建 Job 失败(非阻塞):", vectorJobError);
@@ -11789,15 +11813,13 @@ async function maybeImportLegacyIndexedDbSnapshotToLocalStore(
}
try {
const settings = getSettings();
if (shouldUseAuthorityJobs(settings)) {
const vectorConfig = normalizeAuthorityVectorConfig(settings, buildAuthorityGraphStoreOptions(settings));
if (vectorConfig?.mode === "authority") {
await submitAuthorityVectorRebuildJob({
config: vectorConfig,
purge: true,
reason: "authority-migration-trivium-rebuild",
});
}
const vectorConfig = normalizeAuthorityVectorConfig(settings, buildAuthorityGraphStoreOptions(settings));
if (shouldUseAuthorityJobs(vectorConfig, AUTHORITY_VECTOR_REBUILD_JOB_TYPE)) {
await submitAuthorityVectorRebuildJob({
config: vectorConfig,
purge: true,
reason: "authority-migration-trivium-rebuild",
});
}
} catch (vectorJobError) {
console.warn("[ST-BME] 迁移后触发 Trivium 重建 Job 失败(非阻塞):", vectorJobError);
@@ -12048,15 +12070,13 @@ async function maybeImportLegacyOpfsSnapshotToLocalStore(
}
try {
const settings = getSettings();
if (shouldUseAuthorityJobs(settings)) {
const vectorConfig = normalizeAuthorityVectorConfig(settings, buildAuthorityGraphStoreOptions(settings));
if (vectorConfig?.mode === "authority") {
await submitAuthorityVectorRebuildJob({
config: vectorConfig,
purge: true,
reason: "authority-migration-trivium-rebuild",
});
}
const vectorConfig = normalizeAuthorityVectorConfig(settings, buildAuthorityGraphStoreOptions(settings));
if (shouldUseAuthorityJobs(vectorConfig, AUTHORITY_VECTOR_REBUILD_JOB_TYPE)) {
await submitAuthorityVectorRebuildJob({
config: vectorConfig,
purge: true,
reason: "authority-migration-trivium-rebuild",
});
}
} catch (vectorJobError) {
console.warn("[ST-BME] 迁移后触发 Trivium 重建 Job 失败(非阻塞):", vectorJobError);
@@ -24261,4 +24281,3 @@ async function onCompactLukerSidecar() {
}
debugLog("[ST-BME] 初始化完成");
})();

View File

@@ -634,4 +634,55 @@ assert.deepEqual(JSON.parse(String(httpRequests[3].options.body || "{}")), {
},
});
// Regression: onRebuildVectorIndexController skips submitAuthorityVectorRebuildJob when
// shouldUseAuthorityJobs() returns false and still runs syncVectorState with purge: true
// for Authority vector config.
const noJobsRuntime = createVectorControllerRuntime({
shouldUseAuthorityJobs() {
this.calls.push(["shouldUseAuthorityJobs"]);
return false;
},
});
await onRebuildVectorIndexController(noJobsRuntime);
assert.equal(
noJobsRuntime.calls.some(([name]) => name === "submitAuthorityVectorRebuildJob"),
false,
"submitAuthorityVectorRebuildJob should NOT be called when shouldUseAuthorityJobs returns false",
);
const noJobsSync = noJobsRuntime.calls.find(([name]) => name === "syncVectorState");
assert.equal(noJobsSync?.[1]?.purge, true, "syncVectorState should use purge:true for Authority config fallback");
assert.equal(
noJobsRuntime.calls.some(([name]) => name === "saveGraphToChat"),
true,
"saveGraphToChat should still be called in fallback path",
);
// Regression: Fallback warning wording no longer says 本地重建; instead it uses
// direct/synchronous rebuild wording.
const fallbackWarningRuntime = createVectorControllerRuntime({
async submitAuthorityVectorRebuildJob(payload) {
this.calls.push(["submitAuthorityVectorRebuildJob", payload]);
return { submitted: false, error: "job-type-unsupported" };
},
});
await onRebuildVectorIndexController(fallbackWarningRuntime);
const fallbackWarningMessage = fallbackWarningRuntime.calls.find(
([name]) => name === "toastr.warning",
)?.[1];
assert.equal(
typeof fallbackWarningMessage === "string",
true,
"a warning toast should be emitted when job submission fails",
);
assert.equal(
fallbackWarningMessage.includes("本地重建"),
false,
"fallback warning should NOT contain 本地重建",
);
assert.equal(
/直接|同步|direct|synchronous/i.test(fallbackWarningMessage),
true,
"fallback warning should contain direct/synchronous rebuild wording",
);
console.log("authority-jobs tests passed");

View File

@@ -107,6 +107,7 @@ import {
normalizeAuthorityCapabilityState,
probeAuthorityCapabilities,
} from "../runtime/authority-capabilities.js";
import { normalizeAuthorityBlobConfig } from "../maintenance/authority-blob-adapter.js";
import {
createAuthorityBrowserState,
getAuthorityBrowserStateSnapshot,
@@ -138,6 +139,27 @@ const moduleDir = path.dirname(fileURLToPath(import.meta.url));
const indexPath = path.resolve(moduleDir, "../index.js");
const indexSource = await fs.readFile(indexPath, "utf8");
function isAuthorityVectorConfig(config = null) {
return config?.mode === "authority" || config?.source === "authority-trivium";
}
function normalizeAuthorityVectorConfig(settings = {}, overrides = {}) {
return {
...overrides,
mode: "authority",
source: "authority-trivium",
baseUrl: String(settings?.authorityBaseUrl || overrides?.baseUrl || "/api/plugins/authority"),
};
}
function createAuthorityBlobAdapter() {
return {
async writeJson(path = "", payload = null) {
return { path, payload, written: true };
},
};
}
function extractSnippet(startMarker, endMarker) {
const start = indexSource.indexOf(startMarker);
const end = indexSource.indexOf(endMarker);
@@ -629,6 +651,10 @@ async function createGraphPersistenceHarness({
normalizeAuthoritySettings,
normalizeAuthorityCapabilityState,
probeAuthorityCapabilities,
isAuthorityVectorConfig,
normalizeAuthorityVectorConfig,
normalizeAuthorityBlobConfig,
createAuthorityBlobAdapter,
createAuthorityBrowserState,
getAuthorityBrowserStateSnapshot,
normalizeAuthorityBrowserState,
@@ -1581,6 +1607,7 @@ result = {
maybeFlushQueuedGraphPersist,
retryPendingGraphPersist,
persistExtractionBatchResult,
shouldUseAuthorityJobs,
onRebuildLocalCacheFromLukerSidecar,
saveGraphToIndexedDb,
cloneGraphForPersistence,
@@ -3993,6 +4020,12 @@ result = {
sessionReady: true,
permissionReady: true,
features: ["sql.query", "sql.mutation", "trivium.search", "jobs", "blob"],
jobs: {
builtinTypes: ["delay", "sql.backup", "trivium.flush", "fs.import-jsonl"],
registry: {
jobTypes: ["delay", "sql.backup", "trivium.flush", "fs.import-jsonl"],
},
},
reason: "ok",
lastProbeAt: Date.now(),
});
@@ -4037,6 +4070,29 @@ result = {
);
}
{
const harness = await createGraphPersistenceHarness({
chatId: "chat-authority-empty-jobs",
globalChatId: "chat-authority-empty-jobs",
});
harness.api.setAuthorityCapabilityState({
installed: true,
healthy: true,
sessionReady: true,
permissionReady: true,
features: ["sql.query", "sql.mutation", "trivium.search", "jobs", "blob"],
supportedJobTypes: [],
supportedJobTypesKnown: true,
reason: "ok",
});
assert.equal(
harness.api.shouldUseAuthorityJobs({ mode: "authority", source: "authority-trivium" }),
false,
"显式空 Authority job 白名单应阻止 vector rebuild job 提交",
);
}
{
const harness = await createGraphPersistenceHarness({
chatId: "chat-b",

View File

@@ -608,7 +608,7 @@ export async function onRebuildVectorIndexController(runtime, range = null) {
}
if (jobResult?.error) {
runtime.toastr.warning(
`Authority Job 提交失败,已回退本地重建:${jobResult.error}`,
`Authority Job 提交失败,已回退直接同步重建:${jobResult.error}`,
);
}
}