feat: 支持终止进行中的运行流程

This commit is contained in:
Youzini-afk
2026-03-25 01:39:12 +08:00
parent cf067a4fcd
commit 8276aa1ff8
8 changed files with 396 additions and 54 deletions

252
index.js
View File

@@ -196,6 +196,12 @@ const stageNoticeHandles = {
recall: null,
history: null,
};
const stageAbortControllers = {
extraction: null,
vector: null,
recall: null,
history: null,
};
function createUiStatus(text = "待命", meta = "", level = "idle") {
return {
@@ -214,6 +220,87 @@ function normalizeStageNoticeLevel(level = "info") {
return "info";
}
function createAbortError(message = "操作已终止") {
const error = new Error(message);
error.name = "AbortError";
return error;
}
function isAbortError(error) {
return error?.name === "AbortError";
}
function throwIfAborted(signal, message = "操作已终止") {
if (signal?.aborted) {
throw signal.reason instanceof Error
? signal.reason
: createAbortError(message);
}
}
function getStageAbortLabel(stage) {
switch (stage) {
case "extraction":
return "提取";
case "vector":
return "向量";
case "recall":
return "召回";
case "history":
return "历史恢复";
default:
return "当前流程";
}
}
function beginStageAbortController(stage) {
const controller = new AbortController();
stageAbortControllers[stage] = controller;
return controller;
}
function finishStageAbortController(stage, controller = null) {
if (!controller || stageAbortControllers[stage] === controller) {
stageAbortControllers[stage] = null;
}
}
function findAbortableStageForNotice(stage) {
const preferred = [stage];
if (stage === "vector") {
preferred.push("history", "extraction", "recall");
}
for (const candidate of preferred) {
const controller = stageAbortControllers[candidate];
if (controller && !controller.signal.aborted) {
return candidate;
}
}
return null;
}
function abortStage(stage) {
const controller = stageAbortControllers[stage];
if (!controller || controller.signal.aborted) return false;
controller.abort(createAbortError(`${getStageAbortLabel(stage)}已终止`));
return true;
}
function buildAbortStageAction(stage) {
const abortStageName = findAbortableStageForNotice(stage);
if (!abortStageName) return undefined;
return {
label: `终止${getStageAbortLabel(abortStageName)}`,
kind: "danger",
onClick: () => {
abortStage(abortStageName);
},
};
}
function getStageNoticeTitle(stage) {
switch (stage) {
case "extraction":
@@ -264,6 +351,12 @@ function dismissAllStageNotices() {
}
}
function abortAllRunningStages() {
for (const stage of Object.keys(stageAbortControllers)) {
abortStage(stage);
}
}
function updateStageNotice(
stage,
text,
@@ -284,9 +377,12 @@ function updateStageNotice(
persist,
duration_ms: options.duration_ms ?? getStageNoticeDuration(noticeLevel),
action:
options.action === undefined &&
(noticeLevel === "warning" || noticeLevel === "error")
? createNoticePanelAction()
options.action === undefined
? (busy
? buildAbortStageAction(stage)
: (noticeLevel === "warning" || noticeLevel === "error")
? createNoticePanelAction()
: undefined)
: options.action,
};
@@ -707,12 +803,14 @@ async function recordGraphMutation({
processedRange = null,
artifactTags = [],
syncRange = null,
signal = undefined,
} = {}) {
ensureCurrentGraphRuntimeState();
const vectorSync = await syncVectorState({
force: true,
purge: isBackendVectorConfig(getEmbeddingConfig()) && !syncRange,
range: syncRange,
signal,
});
const afterSnapshot = cloneGraphSnapshot(currentGraph);
const effectiveRange = Array.isArray(processedRange)
@@ -784,6 +882,7 @@ async function syncVectorState({
force = false,
purge = false,
range = null,
signal = undefined,
} = {}) {
ensureCurrentGraphRuntimeState();
const scopeLabel =
@@ -818,6 +917,7 @@ async function syncVectorState({
force,
purge,
range,
signal,
});
setLastVectorStatus(
"向量完成",
@@ -827,6 +927,17 @@ async function syncVectorState({
);
return result;
} catch (error) {
if (isAbortError(error)) {
setLastVectorStatus("向量已终止", scopeLabel, "warning", {
syncRuntime: false,
});
return {
insertedHashes: [],
stats: getVectorIndexStats(currentGraph),
error: error?.message || "向量任务已终止",
aborted: true,
};
}
const message = error?.message || String(error) || "向量同步失败";
markVectorStateDirty(message);
console.error("[ST-BME] 向量同步失败:", error);
@@ -842,7 +953,7 @@ async function syncVectorState({
}
}
async function ensureVectorReadyIfNeeded(reason = "vector-ready-check") {
async function ensureVectorReadyIfNeeded(reason = "vector-ready-check", signal = undefined) {
if (!currentGraph) return;
ensureCurrentGraphRuntimeState();
@@ -855,6 +966,7 @@ async function ensureVectorReadyIfNeeded(reason = "vector-ready-check") {
const result = await syncVectorState({
force: true,
purge: isBackendVectorConfig(config),
signal,
});
if (result?.error) {
@@ -993,6 +1105,7 @@ function updateModuleSettings(patch = {}) {
Object.prototype.hasOwnProperty.call(patch, "enabled") &&
patch.enabled === false
) {
abortAllRunningStages();
dismissAllStageNotices();
try {
const context = getContext();
@@ -1309,9 +1422,10 @@ function getCurrentChatSeq(context = getContext()) {
return currentGraph?.lastProcessedSeq ?? 0;
}
async function handleExtractionSuccess(result, endIdx, settings) {
async function handleExtractionSuccess(result, endIdx, settings, signal = undefined) {
const postProcessArtifacts = [];
const warnings = [];
throwIfAborted(signal, "提取已终止");
extractionCount++;
updateLastExtractedItems(result.newNodeIds || []);
@@ -1323,9 +1437,11 @@ async function handleExtractionSuccess(result, endIdx, settings) {
embeddingConfig: getEmbeddingConfig(),
options: { neighborCount: settings.evoNeighborCount },
customPrompt: settings.evolutionPrompt || undefined,
signal,
});
postProcessArtifacts.push("evolution");
} catch (e) {
if (isAbortError(e)) throw e;
console.error("[ST-BME] 记忆进化失败:", e);
}
}
@@ -1337,9 +1453,11 @@ async function handleExtractionSuccess(result, endIdx, settings) {
schema: getSchema(),
currentSeq: endIdx,
customPrompt: settings.synopsisPrompt || undefined,
signal,
});
postProcessArtifacts.push("synopsis");
} catch (e) {
if (isAbortError(e)) throw e;
console.error("[ST-BME] 概要生成失败:", e);
}
}
@@ -1353,9 +1471,11 @@ async function handleExtractionSuccess(result, endIdx, settings) {
graph: currentGraph,
currentSeq: endIdx,
customPrompt: settings.reflectionPrompt || undefined,
signal,
});
postProcessArtifacts.push("reflection");
} catch (e) {
if (isAbortError(e)) throw e;
console.error("[ST-BME] 反思生成失败:", e);
}
}
@@ -1370,23 +1490,29 @@ async function handleExtractionSuccess(result, endIdx, settings) {
}
try {
throwIfAborted(signal, "提取已终止");
const compressionResult = await compressAll(
currentGraph,
getSchema(),
getEmbeddingConfig(),
false,
settings.compressPrompt || undefined,
signal,
);
if (compressionResult.created > 0 || compressionResult.archived > 0) {
postProcessArtifacts.push("compression");
}
} catch (error) {
if (isAbortError(error)) throw error;
const message = error?.message || String(error) || "压缩阶段失败";
warnings.push(`压缩阶段失败: ${message}`);
console.error("[ST-BME] 记忆压缩失败:", error);
}
const vectorSync = await syncVectorState();
const vectorSync = await syncVectorState({ signal });
if (vectorSync?.aborted) {
throw createAbortError(vectorSync.error || "提取已终止");
}
if (vectorSync?.error) {
warnings.push(`向量同步失败: ${vectorSync.error}`);
}
@@ -1520,12 +1646,13 @@ function inspectHistoryMutation(trigger = "history-change") {
return detection;
}
async function purgeCurrentVectorCollection() {
async function purgeCurrentVectorCollection(signal = undefined) {
if (!currentGraph?.vectorIndexState?.collectionId) return;
const response = await fetchLocalWithTimeout("/api/vector/purge", {
method: "POST",
headers: getRequestHeaders(),
signal,
body: JSON.stringify({
collectionId: currentGraph.vectorIndexState.collectionId,
}),
@@ -1537,14 +1664,17 @@ async function purgeCurrentVectorCollection() {
}
}
async function prepareVectorStateForReplay(fullReset = false) {
async function prepareVectorStateForReplay(fullReset = false, signal = undefined) {
ensureCurrentGraphRuntimeState();
const config = getEmbeddingConfig();
if (isBackendVectorConfig(config)) {
try {
await purgeCurrentVectorCollection();
await purgeCurrentVectorCollection(signal);
} catch (error) {
if (isAbortError(error)) {
throw error;
}
console.warn("[ST-BME] 清理后端向量索引失败,继续本地恢复:", error);
}
currentGraph.vectorIndexState.hashToNodeId = {};
@@ -1568,8 +1698,10 @@ async function executeExtractionBatch({
endIdx,
settings,
smartTriggerDecision = null,
signal = undefined,
} = {}) {
ensureCurrentGraphRuntimeState();
throwIfAborted(signal, "提取已终止");
const lastProcessed = getLastProcessedAssistantFloor();
const beforeSnapshot = cloneGraphSnapshot(currentGraph);
const messages = buildExtractionMessages(chat, startIdx, endIdx, settings);
@@ -1594,6 +1726,7 @@ async function executeExtractionBatch({
enablePreciseConflict: settings.enablePreciseConflict,
conflictThreshold: settings.conflictThreshold,
},
signal,
});
if (!result.success) {
@@ -1605,7 +1738,7 @@ async function executeExtractionBatch({
};
}
const effects = await handleExtractionSuccess(result, endIdx, settings);
const effects = await handleExtractionSuccess(result, endIdx, settings, signal);
updateProcessedHistorySnapshot(chat, endIdx);
const afterSnapshot = cloneGraphSnapshot(currentGraph);
@@ -1632,10 +1765,11 @@ async function executeExtractionBatch({
};
}
async function replayExtractionFromHistory(chat, settings) {
async function replayExtractionFromHistory(chat, settings, signal = undefined) {
let replayedBatches = 0;
while (true) {
throwIfAborted(signal, "历史恢复已终止");
const pendingAssistantTurns = getAssistantTurns(chat).filter(
(index) => index > getLastProcessedAssistantFloor(),
);
@@ -1651,6 +1785,7 @@ async function replayExtractionFromHistory(chat, settings) {
startIdx,
endIdx,
settings,
signal,
});
if (!batchResult.success) {
@@ -1693,6 +1828,8 @@ async function recoverHistoryIfNeeded(trigger = "history-recovery") {
: detection.earliestAffectedFloor;
let replayedBatches = 0;
let usedFullRebuild = false;
const historyController = beginStageAbortController("history");
const historySignal = historyController.signal;
updateStageNotice(
"history",
@@ -1708,6 +1845,7 @@ async function recoverHistoryIfNeeded(trigger = "history-recovery") {
);
try {
throwIfAborted(historySignal, "历史恢复已终止");
const recoveryPoint = findJournalRecoveryPoint(currentGraph, initialDirtyFrom);
if (recoveryPoint) {
currentGraph = normalizeGraphRuntimeState(
@@ -1719,8 +1857,8 @@ async function recoverHistoryIfNeeded(trigger = "history-recovery") {
usedFullRebuild = true;
}
await prepareVectorStateForReplay(usedFullRebuild);
replayedBatches = await replayExtractionFromHistory(chat, settings);
await prepareVectorStateForReplay(usedFullRebuild, historySignal);
replayedBatches = await replayExtractionFromHistory(chat, settings, historySignal);
clearHistoryDirty(
currentGraph,
@@ -1750,12 +1888,26 @@ async function recoverHistoryIfNeeded(trigger = "history-recovery") {
);
return true;
} catch (error) {
if (isAbortError(error)) {
updateStageNotice(
"history",
"历史恢复已终止",
error?.message || "已手动终止当前恢复流程",
"warning",
{
busy: false,
persist: false,
},
);
saveGraphToChat();
return false;
}
console.error("[ST-BME] 历史恢复失败,尝试全量重建:", error);
try {
currentGraph = normalizeGraphRuntimeState(createEmptyGraph(), chatId);
await prepareVectorStateForReplay(true);
replayedBatches = await replayExtractionFromHistory(chat, settings);
await prepareVectorStateForReplay(true, historySignal);
replayedBatches = await replayExtractionFromHistory(chat, settings, historySignal);
clearHistoryDirty(
currentGraph,
buildRecoveryResult("full-rebuild", {
@@ -1799,6 +1951,7 @@ async function recoverHistoryIfNeeded(trigger = "history-recovery") {
return false;
}
} finally {
finishStageAbortController("history", historyController);
isRecoveringHistory = false;
}
}
@@ -1850,6 +2003,8 @@ async function runExtraction() {
);
isExtracting = true;
const extractionController = beginStageAbortController("extraction");
const extractionSignal = extractionController.signal;
try {
const batchResult = await executeExtractionBatch({
@@ -1858,6 +2013,7 @@ async function runExtraction() {
endIdx,
settings,
smartTriggerDecision,
signal: extractionSignal,
});
if (!batchResult.success) {
@@ -1877,9 +2033,16 @@ async function runExtraction() {
{ syncRuntime: true },
);
} catch (e) {
if (isAbortError(e)) {
setLastExtractionStatus("提取已终止", e?.message || "已手动终止当前提取", "warning", {
syncRuntime: true,
});
return;
}
console.error("[ST-BME] 提取失败:", e);
notifyExtractionIssue(e?.message || String(e) || "自动提取失败");
} finally {
finishStageAbortController("extraction", extractionController);
isExtracting = false;
}
}
@@ -1894,15 +2057,16 @@ async function runRecall() {
if (!settings.enabled || !settings.recallEnabled) return;
if (!(await recoverHistoryIfNeeded("pre-recall"))) return;
await ensureVectorReadyIfNeeded("pre-recall");
const context = getContext();
const chat = context.chat;
if (!chat || chat.length === 0) return;
isRecalling = true;
const recallController = beginStageAbortController("recall");
const recallSignal = recallController.signal;
try {
await ensureVectorReadyIfNeeded("pre-recall", recallSignal);
const recentContextMessageLimit = clampInt(
settings.recallLlmContextMessages,
4,
@@ -1937,6 +2101,7 @@ async function runRecall() {
recentMessages,
embeddingConfig: getEmbeddingConfig(),
schema: getSchema(),
signal: recallSignal,
options: {
topK: settings.recallTopK,
maxRecallNodes: settings.recallMaxNodes,
@@ -2020,6 +2185,12 @@ async function runRecall() {
}
}
} catch (e) {
if (isAbortError(e)) {
setLastRecallStatus("召回已终止", e?.message || "已手动终止当前召回", "warning", {
syncRuntime: true,
});
return;
}
console.error("[ST-BME] 召回失败:", e);
const message = e?.message || String(e);
setLastRecallStatus("召回失败", message, "error", {
@@ -2028,6 +2199,7 @@ async function runRecall() {
});
toastr.error(`召回失败: ${message}`);
} finally {
finishStageAbortController("recall", recallController);
isRecalling = false;
refreshPanelLiveState();
}
@@ -2039,6 +2211,7 @@ function onChatChanged() {
clearTimeout(pendingHistoryRecoveryTimer);
pendingHistoryRecoveryTimer = null;
pendingHistoryRecoveryTrigger = "";
abortAllRunningStages();
dismissAllStageNotices();
loadGraphFromChat();
clearInjectionState();
@@ -2375,6 +2548,8 @@ async function onManualExtract() {
const warnings = [];
isExtracting = true;
const extractionController = beginStageAbortController("extraction");
const extractionSignal = extractionController.signal;
setLastExtractionStatus(
"手动提取中",
`待处理 assistant 楼层 ${pendingAssistantTurns.length}`,
@@ -2396,6 +2571,7 @@ async function onManualExtract() {
startIdx,
endIdx,
settings,
signal: extractionSignal,
});
if (!batchResult.success) {
@@ -2438,6 +2614,12 @@ async function onManualExtract() {
);
}
} catch (e) {
if (isAbortError(e)) {
setLastExtractionStatus("手动提取已终止", e?.message || "已手动终止当前提取", "warning", {
syncRuntime: true,
});
return;
}
console.error("[ST-BME] 手动提取失败:", e);
setLastExtractionStatus("手动提取失败", e?.message || String(e), "error", {
syncRuntime: true,
@@ -2446,6 +2628,7 @@ async function onManualExtract() {
});
toastr.error(`手动提取失败: ${e.message || e}`);
} finally {
finishStageAbortController("extraction", extractionController);
isExtracting = false;
}
}
@@ -2510,21 +2693,30 @@ async function onRebuildVectorIndex(range = null) {
return;
}
const result = await syncVectorState({
force: true,
purge: isBackendVectorConfig(config) && !range,
range,
});
const vectorController = beginStageAbortController("vector");
try {
const result = await syncVectorState({
force: true,
purge: isBackendVectorConfig(config) && !range,
range,
signal: vectorController.signal,
});
saveGraphToChat();
if (result?.error) {
throw new Error(result.error);
saveGraphToChat();
if (result?.aborted) {
return;
}
if (result?.error) {
throw new Error(result.error);
}
toastr.success(
range
? `范围向量重建完成indexed=${result.stats.indexed}, pending=${result.stats.pending}`
: `当前聊天向量重建完成indexed=${result.stats.indexed}, pending=${result.stats.pending}`,
);
} finally {
finishStageAbortController("vector", vectorController);
}
toastr.success(
range
? `范围向量重建完成indexed=${result.stats.indexed}, pending=${result.stats.pending}`
: `当前聊天向量重建完成indexed=${result.stats.indexed}, pending=${result.stats.pending}`,
);
}
async function onReembedDirect() {