Harden runtime debug and task pipeline

This commit is contained in:
Youzini-afk
2026-03-27 01:26:56 +08:00
parent b20e8dbb44
commit c31af1d1a4
17 changed files with 1750 additions and 238 deletions

View File

@@ -37,7 +37,10 @@ const openAiShimUrl = `data:text/javascript,${encodeURIComponent(
registerHooks({
resolve(specifier, context, nextResolve) {
if (specifier === "../../../extensions.js") {
if (
specifier === "../../../extensions.js" ||
specifier === "../../../../extensions.js"
) {
return {
shortCircuit: true,
url: extensionsShimUrl,
@@ -82,6 +85,8 @@ const {
normalizeGraphRuntimeState,
rollbackBatch,
} = await import("../runtime-state.js");
const { createDefaultTaskProfiles } = await import("../prompt-profiles.js");
const extensionsApi = await import("../../../../extensions.js");
const llm = await import("../llm.js");
const embedding = await import("../embedding.js");
@@ -237,6 +242,159 @@ function createGenerationRecallHarness() {
});
}
function createRerollHarness() {
const indexPath = path.resolve("./index.js");
return fs.readFile(indexPath, "utf8").then((source) => {
const helperStart = source.indexOf(
"function pruneProcessedMessageHashesFromFloor(",
);
const helperEnd = source.indexOf("async function recoverHistoryIfNeeded(");
const rerollStart = source.indexOf("async function onReroll(");
const rerollEnd = source.indexOf("async function onManualSleep()");
if (
helperStart < 0 ||
helperEnd < 0 ||
rerollStart < 0 ||
rerollEnd < 0 ||
helperEnd <= helperStart ||
rerollEnd <= rerollStart
) {
throw new Error("无法从 index.js 提取 reroll 定义");
}
const snippet = [
source.slice(helperStart, helperEnd),
source.slice(rerollStart, rerollEnd),
]
.join("\n")
.replace(/^export\s+/gm, "");
const context = {
console,
Date,
result: null,
currentGraph: null,
isExtracting: false,
extractionCount: 0,
lastExtractedItems: ["stale-node"],
lastExtractionStatus: { level: "idle" },
chat: [],
embeddingConfig: { mode: "backend" },
rollbackAffectedJournalsCalls: [],
deletedHashesCalls: [],
prepareVectorStateCalls: [],
recoveryPlans: [],
saveGraphToChatCalls: 0,
refreshPanelCalls: 0,
clearInjectionCalls: 0,
onManualExtractCalls: 0,
clearedHistoryDirty: null,
postRollbackGraph: null,
manualExtractLevel: "success",
ensureCurrentGraphRuntimeState() {
return context.currentGraph;
},
getContext() {
return {
chat: context.chat,
chatId: "chat-main",
};
},
getCurrentChatId() {
return "chat-main";
},
getAssistantTurns(chat = []) {
return chat.flatMap((message, index) =>
!message?.is_user && !message?.is_system ? [index] : [],
);
},
getLastProcessedAssistantFloor() {
return Number(
context.currentGraph?.historyState?.lastProcessedAssistantFloor ?? -1,
);
},
findJournalRecoveryPoint(graph, floor) {
return context.findJournalRecoveryPointImpl(graph, floor);
},
findJournalRecoveryPointImpl() {
return null;
},
buildReverseJournalRecoveryPlan(...args) {
return context.buildReverseJournalRecoveryPlanImpl(...args);
},
buildReverseJournalRecoveryPlanImpl() {
return {
backendDeleteHashes: [],
replayRequiredNodeIds: [],
pendingRepairFromFloor: null,
legacyGapFallback: false,
dirtyReason: "history-recovery-replay",
};
},
rollbackAffectedJournals(graph, journals) {
context.rollbackAffectedJournalsCalls.push({ graph, journals });
if (context.postRollbackGraph) {
context.currentGraph = context.postRollbackGraph;
}
},
normalizeGraphRuntimeState(graph) {
return graph;
},
getEmbeddingConfig() {
return context.embeddingConfig;
},
applyRecoveryPlanToVectorState(plan, floor) {
context.recoveryPlans.push({ plan, floor });
},
isBackendVectorConfig(config) {
return config?.mode === "backend";
},
async deleteBackendVectorHashesForRecovery(...args) {
context.deletedHashesCalls.push(args);
},
async prepareVectorStateForReplay(...args) {
context.prepareVectorStateCalls.push(args);
},
clearHistoryDirty(graph, result) {
context.clearedHistoryDirty = result;
graph.historyState ||= {};
graph.historyState.historyDirtyFrom = null;
graph.historyState.lastRecoveryResult = result;
},
buildRecoveryResult(status, extra = {}) {
return {
status,
...extra,
};
},
saveGraphToChat() {
context.saveGraphToChatCalls += 1;
return true;
},
refreshPanelLiveState() {
context.refreshPanelCalls += 1;
},
clearInjectionState() {
context.clearInjectionCalls += 1;
},
async onManualExtract() {
context.onManualExtractCalls += 1;
context.lastExtractionStatus = { level: context.manualExtractLevel };
},
toastr: {
info() {},
error() {},
success() {},
},
};
vm.createContext(context);
vm.runInContext(
`${snippet}\nresult = { pruneProcessedMessageHashesFromFloor, rollbackGraphForReroll, onReroll };`,
context,
{ filename: indexPath },
);
return context;
});
}
function pushTestOverrides(patch = {}) {
const previous = globalThis.__stBmeTestOverrides || {};
globalThis.__stBmeTestOverrides = {
@@ -1138,6 +1296,337 @@ async function testGenerationRecallDifferentKeyCanRunAgain() {
);
}
async function testRerollUsesBatchBoundaryRollbackAndPersistsState() {
const harness = await createRerollHarness();
harness.chat = [
{ is_user: true, mes: "u1" },
{ is_user: false, mes: "a1" },
{ is_user: true, mes: "u2" },
{ is_user: false, mes: "a2" },
{ is_user: true, mes: "u3" },
{ is_user: false, mes: "a3" },
];
harness.currentGraph = {
historyState: {
lastProcessedAssistantFloor: 5,
processedMessageHashes: {
1: "hash-1",
3: "hash-3",
5: "hash-5",
},
},
vectorIndexState: {
collectionId: "col-1",
},
batchJournal: [{ id: "journal-1" }],
lastProcessedSeq: 5,
};
harness.postRollbackGraph = {
historyState: {
lastProcessedAssistantFloor: 1,
processedMessageHashes: {
1: "hash-1",
3: "stale-hash",
},
},
vectorIndexState: {
collectionId: "col-1",
},
batchJournal: [],
lastProcessedSeq: 1,
};
harness.findJournalRecoveryPointImpl = () => ({
path: "reverse-journal",
affectedBatchCount: 1,
affectedJournals: [{ id: "journal-1" }],
});
harness.buildReverseJournalRecoveryPlanImpl = () => ({
backendDeleteHashes: ["hash-old"],
replayRequiredNodeIds: ["node-1"],
pendingRepairFromFloor: 2,
legacyGapFallback: false,
dirtyReason: "history-recovery-replay",
});
const result = await harness.result.onReroll({ fromFloor: 3 });
assert.equal(result.success, true);
assert.equal(result.rollbackPerformed, true);
assert.equal(result.recoveryPath, "reverse-journal");
assert.equal(result.effectiveFromFloor, 2);
assert.equal(harness.rollbackAffectedJournalsCalls.length, 1);
assert.equal(harness.deletedHashesCalls.length, 1);
assert.equal(harness.prepareVectorStateCalls.length, 1);
assert.equal(harness.prepareVectorStateCalls[0][2].skipBackendPurge, true);
assert.equal(harness.saveGraphToChatCalls, 1);
assert.equal(harness.refreshPanelCalls, 1);
assert.equal(harness.clearInjectionCalls, 1);
assert.equal(harness.onManualExtractCalls, 1);
assert.equal(harness.currentGraph.historyState.processedMessageHashes[3], undefined);
assert.equal(harness.lastExtractedItems.length, 0);
}
async function testRerollRejectsMissingRecoveryPoint() {
const harness = await createRerollHarness();
harness.chat = [
{ is_user: true, mes: "u1" },
{ is_user: false, mes: "a1" },
{ is_user: true, mes: "u2" },
{ is_user: false, mes: "a2" },
];
harness.currentGraph = {
historyState: {
lastProcessedAssistantFloor: 3,
processedMessageHashes: {
1: "hash-1",
3: "hash-3",
},
},
vectorIndexState: {
collectionId: "col-1",
},
batchJournal: [],
lastProcessedSeq: 3,
};
const result = await harness.result.onReroll({ fromFloor: 3 });
assert.equal(result.success, false);
assert.equal(result.recoveryPath, "unavailable");
assert.equal(harness.onManualExtractCalls, 0);
assert.equal(harness.saveGraphToChatCalls, 0);
}
async function testRerollFallsBackToDirectExtractForUnprocessedFloor() {
const harness = await createRerollHarness();
harness.chat = [
{ is_user: true, mes: "u1" },
{ is_user: false, mes: "a1" },
{ is_user: true, mes: "u2" },
{ is_user: false, mes: "a2" },
];
harness.currentGraph = {
historyState: {
lastProcessedAssistantFloor: 1,
processedMessageHashes: {
1: "hash-1",
},
},
vectorIndexState: {
collectionId: "col-1",
},
batchJournal: [],
lastProcessedSeq: 1,
};
const result = await harness.result.onReroll({ fromFloor: 3 });
assert.equal(result.success, true);
assert.equal(result.rollbackPerformed, false);
assert.equal(result.recoveryPath, "direct-extract");
assert.equal(result.effectiveFromFloor, 2);
assert.equal(harness.onManualExtractCalls, 1);
assert.equal(harness.saveGraphToChatCalls, 0);
}
async function testLlmDebugSnapshotRedactsSecretsBeforeStorage() {
const originalFetch = globalThis.fetch;
const previousSettings = JSON.parse(
JSON.stringify(extensionsApi.extension_settings.st_bme || {}),
);
delete globalThis.__stBmeRuntimeDebugState;
extensionsApi.extension_settings.st_bme = {
...previousSettings,
llmApiUrl: "https://example.com/v1",
llmApiKey: "sk-secret-redaction",
llmModel: "gpt-test",
timeoutMs: 1234,
};
globalThis.fetch = async () =>
new Response(
JSON.stringify({
choices: [
{
message: {
content: '{"ok":true}',
},
finish_reason: "stop",
},
],
}),
{
status: 200,
headers: {
"Content-Type": "application/json",
},
},
);
try {
const result = await llm.callLLMForJSON({
systemPrompt: "system",
userPrompt: "user",
maxRetries: 0,
requestSource: "test:redaction",
});
assert.deepEqual(result, { ok: true });
const snapshot =
globalThis.__stBmeRuntimeDebugState?.taskLlmRequests?.["test:redaction"];
assert.ok(snapshot);
assert.equal(snapshot.redacted, true);
const serialized = JSON.stringify(snapshot);
assert.doesNotMatch(serialized, /sk-secret-redaction/);
assert.match(serialized, /\[REDACTED\]/);
} finally {
globalThis.fetch = originalFetch;
extensionsApi.extension_settings.st_bme = previousSettings;
}
}
async function testEmbeddingUsesConfigTimeoutInsteadOfDefault() {
const originalFetch = globalThis.fetch;
const originalSetTimeout = globalThis.setTimeout;
const originalClearTimeout = globalThis.clearTimeout;
let capturedDelay = null;
globalThis.setTimeout = (fn, delay, ...args) => {
capturedDelay = delay;
return originalSetTimeout(fn, 0, ...args);
};
globalThis.clearTimeout = originalClearTimeout;
globalThis.fetch = async (_url, options = {}) =>
await new Promise((resolve, reject) => {
options.signal?.addEventListener(
"abort",
() => reject(options.signal.reason),
{ once: true },
);
});
try {
await assert.rejects(
embedding.embedText("timeout test", {
apiUrl: "https://example.com/v1",
model: "text-embedding-test",
timeoutMs: 7,
}),
/Embedding 请求超时/,
);
assert.equal(capturedDelay, 7);
} finally {
globalThis.fetch = originalFetch;
globalThis.setTimeout = originalSetTimeout;
globalThis.clearTimeout = originalClearTimeout;
}
}
async function testLlmOutputRegexCleansResponseBeforeJsonParse() {
const originalFetch = globalThis.fetch;
const previousSettings = JSON.parse(
JSON.stringify(extensionsApi.extension_settings.st_bme || {}),
);
delete globalThis.__stBmeRuntimeDebugState;
const taskProfiles = createDefaultTaskProfiles();
taskProfiles.extract.profiles[0].regex = {
...taskProfiles.extract.profiles[0].regex,
enabled: true,
inheritStRegex: false,
stages: {
...taskProfiles.extract.profiles[0].regex.stages,
"output.rawResponse": true,
"output.beforeParse": true,
},
localRules: [
{
id: "strip-prefix",
script_name: "strip-prefix",
enabled: true,
find_regex: "/^NOTE:\\s*/g",
replace_string: "",
trim_strings: [],
source: {
ai_output: true,
},
destination: {
prompt: true,
display: false,
},
},
{
id: "strip-suffix",
script_name: "strip-suffix",
enabled: true,
find_regex: "/\\s*END$/g",
replace_string: "",
trim_strings: [],
source: {
ai_output: true,
},
destination: {
prompt: true,
display: false,
},
},
],
};
extensionsApi.extension_settings.st_bme = {
...previousSettings,
llmApiUrl: "https://example.com/v1",
llmApiKey: "sk-secret-redaction",
llmModel: "gpt-test",
taskProfilesVersion: 1,
taskProfiles,
};
globalThis.fetch = async () =>
new Response(
JSON.stringify({
choices: [
{
message: {
content: 'NOTE: {"ok":true} END',
},
finish_reason: "stop",
},
],
}),
{
status: 200,
headers: {
"Content-Type": "application/json",
},
},
);
try {
const result = await llm.callLLMForJSON({
systemPrompt: "system",
userPrompt: "user",
maxRetries: 0,
taskType: "extract",
requestSource: "test:output-regex",
});
assert.deepEqual(result, { ok: true });
const snapshot =
globalThis.__stBmeRuntimeDebugState?.taskLlmRequests?.extract;
assert.ok(snapshot);
assert.equal(snapshot.responseCleaning?.applied, true);
assert.equal(snapshot.responseCleaning?.changed, true);
assert.deepEqual(
snapshot.responseCleaning?.stages?.map((entry) => entry.stage),
["output.rawResponse", "output.beforeParse"],
);
} finally {
globalThis.fetch = originalFetch;
extensionsApi.extension_settings.st_bme = previousSettings;
}
}
await testCompressorMigratesEdgesToCompressedNode();
await testVectorIndexKeepsDirtyOnDirectPartialEmbeddingFailure();
await testExtractorFailsOnUnknownOperation();
@@ -1155,5 +1644,11 @@ await testProcessedHistoryAdvanceRequiresCompleteStrongSuccess();
await testGenerationRecallTransactionDedupesDoubleHookBySameKey();
await testGenerationRecallBeforeCombineRunsStandalone();
await testGenerationRecallDifferentKeyCanRunAgain();
await testRerollUsesBatchBoundaryRollbackAndPersistsState();
await testRerollRejectsMissingRecoveryPoint();
await testRerollFallsBackToDirectExtractForUnprocessedFloor();
await testLlmDebugSnapshotRedactsSecretsBeforeStorage();
await testEmbeddingUsesConfigTimeoutInsteadOfDefault();
await testLlmOutputRegexCleansResponseBeforeJsonParse();
console.log("p0-regressions tests passed");