refactor: move extraction and recall pipelines into controllers

This commit is contained in:
Youzini-afk
2026-03-29 16:45:21 +08:00
parent 81d06870db
commit 29e0b838c0
3 changed files with 407 additions and 331 deletions

View File

@@ -123,3 +123,102 @@ export async function executeExtractionBatchController(
"批次未完成 finalize 闭环",
};
}
export async function runExtractionController(runtime) {
if (runtime.getIsExtracting() || !runtime.getCurrentGraph()) return;
const settings = runtime.getSettings();
if (!settings.enabled) return;
if (!runtime.ensureGraphMutationReady("自动提取", { notify: false })) {
runtime.setLastExtractionStatus(
"等待图谱加载",
runtime.getGraphMutationBlockReason("自动提取"),
"warning",
{ syncRuntime: true },
);
return;
}
if (!(await runtime.recoverHistoryIfNeeded("auto-extract"))) return;
const context = runtime.getContext();
const chat = context.chat;
if (!chat || chat.length === 0) return;
const assistantTurns = runtime.getAssistantTurns(chat);
const lastProcessed = runtime.getLastProcessedAssistantFloor();
const unprocessedAssistantTurns = assistantTurns.filter((i) => i > lastProcessed);
if (unprocessedAssistantTurns.length === 0) return;
const extractEvery = runtime.clampInt(settings.extractEvery, 1, 1, 50);
const smartTriggerDecision = settings.enableSmartTrigger
? runtime.getSmartTriggerDecision(chat, lastProcessed, settings)
: { triggered: false, score: 0, reasons: [] };
if (
unprocessedAssistantTurns.length < extractEvery &&
!smartTriggerDecision.triggered
) {
return;
}
const batchAssistantTurns = smartTriggerDecision.triggered
? unprocessedAssistantTurns
: unprocessedAssistantTurns.slice(0, extractEvery);
const startIdx = batchAssistantTurns[0];
const endIdx = batchAssistantTurns[batchAssistantTurns.length - 1];
runtime.setIsExtracting(true);
const extractionController = runtime.beginStageAbortController("extraction");
const extractionSignal = extractionController.signal;
runtime.setLastExtractionStatus(
"提取中",
`楼层 ${startIdx}-${endIdx}${smartTriggerDecision.triggered ? " · 智能触发" : ""}`,
"running",
{ syncRuntime: true },
);
try {
const batchResult = await runtime.executeExtractionBatch({
chat,
startIdx,
endIdx,
settings,
smartTriggerDecision,
signal: extractionSignal,
});
if (!batchResult.success) {
const message =
batchResult.error ||
batchResult?.result?.error ||
"提取批次未返回有效结果";
runtime.console.warn("[ST-BME] 提取批次未返回有效结果:", message);
runtime.notifyExtractionIssue(message);
return;
}
runtime.setLastExtractionStatus(
"提取完成",
`楼层 ${startIdx}-${endIdx} · 新建 ${batchResult.result?.newNodes || 0} · 更新 ${batchResult.result?.updatedNodes || 0} · 新边 ${batchResult.result?.newEdges || 0}`,
"success",
{ syncRuntime: true },
);
} catch (e) {
if (runtime.isAbortError(e)) {
runtime.setLastExtractionStatus(
"提取已终止",
e?.message || "已手动终止当前提取",
"warning",
{
syncRuntime: true,
},
);
return;
}
runtime.console.error("[ST-BME] 提取失败:", e);
runtime.notifyExtractionIssue(e?.message || String(e) || "自动提取失败");
} finally {
runtime.finishStageAbortController("extraction", extractionController);
runtime.setIsExtracting(false);
}
}

445
index.js
View File

@@ -18,16 +18,20 @@ import {
import { compressAll, sleepCycle } from "./compressor.js";
import { consolidateMemories } from "./consolidator.js";
import {
executeExtractionBatchController,
runExtractionController,
} from "./extraction-controller.js";
import {
extractMemories,
generateReflection,
generateSynopsis,
} from "./extractor.js";
import { executeExtractionBatchController } from "./extraction-controller.js";
import {
applyRecallInjectionController,
buildRecallRecentMessagesController,
getRecallUserMessageSourceLabelController,
runRecallController,
resolveRecallInputController,
} from "./recall-controller.js";
import {
@@ -4186,104 +4190,29 @@ async function recoverHistoryIfNeeded(trigger = "history-recovery") {
* 提取管线:处理未提取的对话楼层
*/
async function runExtraction() {
if (isExtracting || !currentGraph) return;
const settings = getSettings();
if (!settings.enabled) return;
if (!ensureGraphMutationReady("自动提取", { notify: false })) {
setLastExtractionStatus(
"等待图谱加载",
getGraphMutationBlockReason("自动提取"),
"warning",
{ syncRuntime: true },
);
return;
}
if (!(await recoverHistoryIfNeeded("auto-extract"))) return;
const context = getContext();
const chat = context.chat;
if (!chat || chat.length === 0) return;
const assistantTurns = getAssistantTurns(chat);
const lastProcessed = getLastProcessedAssistantFloor();
const unprocessedAssistantTurns = assistantTurns.filter(
(i) => i > lastProcessed,
);
if (unprocessedAssistantTurns.length === 0) return;
const extractEvery = clampInt(settings.extractEvery, 1, 1, 50);
const smartTriggerDecision = settings.enableSmartTrigger
? getSmartTriggerDecision(chat, lastProcessed, settings)
: { triggered: false, score: 0, reasons: [] };
if (
unprocessedAssistantTurns.length < extractEvery &&
!smartTriggerDecision.triggered
) {
return;
}
const batchAssistantTurns = smartTriggerDecision.triggered
? unprocessedAssistantTurns
: unprocessedAssistantTurns.slice(0, extractEvery);
const startIdx = batchAssistantTurns[0];
const endIdx = batchAssistantTurns[batchAssistantTurns.length - 1];
isExtracting = true;
const extractionController = beginStageAbortController("extraction");
const extractionSignal = extractionController.signal;
setLastExtractionStatus(
"提取中",
`楼层 ${startIdx}-${endIdx}${smartTriggerDecision.triggered ? " · 智能触发" : ""}`,
"running",
{ syncRuntime: true },
);
try {
const batchResult = await executeExtractionBatch({
chat,
startIdx,
endIdx,
settings,
smartTriggerDecision,
signal: extractionSignal,
});
if (!batchResult.success) {
const message =
batchResult.error ||
batchResult?.result?.error ||
"提取批次未返回有效结果";
console.warn("[ST-BME] 提取批次未返回有效结果:", message);
notifyExtractionIssue(message);
return;
}
setLastExtractionStatus(
"提取完成",
`楼层 ${startIdx}-${endIdx} · 新建 ${batchResult.result?.newNodes || 0} · 更新 ${batchResult.result?.updatedNodes || 0} · 新边 ${batchResult.result?.newEdges || 0}`,
"success",
{ syncRuntime: true },
);
} catch (e) {
if (isAbortError(e)) {
setLastExtractionStatus(
"提取已终止",
e?.message || "已手动终止当前提取",
"warning",
{
syncRuntime: true,
},
);
return;
}
console.error("[ST-BME] 提取失败:", e);
notifyExtractionIssue(e?.message || String(e) || "自动提取失败");
} finally {
finishStageAbortController("extraction", extractionController);
isExtracting = false;
}
return await runExtractionController({
beginStageAbortController,
clampInt,
console,
ensureGraphMutationReady,
executeExtractionBatch,
finishStageAbortController,
getAssistantTurns,
getContext,
getCurrentGraph: () => currentGraph,
getGraphMutationBlockReason,
getIsExtracting: () => isExtracting,
getLastProcessedAssistantFloor,
getSettings,
getSmartTriggerDecision,
isAbortError,
notifyExtractionIssue,
recoverHistoryIfNeeded,
setIsExtracting: (value) => {
isExtracting = value;
},
setLastExtractionStatus,
});
}
function applyRecallInjection(settings, recallInput, recentMessages, result) {
@@ -4318,242 +4247,96 @@ function applyRecallInjection(settings, recallInput, recentMessages, result) {
);
}
function buildRecallRetrieveOptions(settings, context) {
return {
topK: settings.recallTopK,
maxRecallNodes: settings.recallMaxNodes,
enableLLMRecall: settings.recallEnableLLM,
enableVectorPrefilter: settings.recallEnableVectorPrefilter,
enableGraphDiffusion: settings.recallEnableGraphDiffusion,
diffusionTopK: settings.recallDiffusionTopK,
llmCandidatePool: settings.recallLlmCandidatePool,
recallPrompt: undefined,
weights: {
graphWeight: settings.graphWeight,
vectorWeight: settings.vectorWeight,
importanceWeight: settings.importanceWeight,
},
// v2 options
enableVisibility: settings.enableVisibility ?? false,
visibilityFilter: context.name2 || null,
enableCrossRecall: settings.enableCrossRecall ?? false,
enableProbRecall: settings.enableProbRecall ?? false,
probRecallChance: settings.probRecallChance ?? 0.15,
enableMultiIntent: settings.recallEnableMultiIntent ?? true,
multiIntentMaxSegments: settings.recallMultiIntentMaxSegments ?? 4,
teleportAlpha: settings.recallTeleportAlpha ?? 0.15,
enableTemporalLinks: settings.recallEnableTemporalLinks ?? true,
temporalLinkStrength: settings.recallTemporalLinkStrength ?? 0.2,
enableDiversitySampling: settings.recallEnableDiversitySampling ?? true,
dppCandidateMultiplier: settings.recallDppCandidateMultiplier ?? 3,
dppQualityWeight: settings.recallDppQualityWeight ?? 1.0,
enableCooccurrenceBoost: settings.recallEnableCooccurrenceBoost ?? false,
cooccurrenceScale: settings.recallCooccurrenceScale ?? 0.1,
cooccurrenceMaxNeighbors: settings.recallCooccurrenceMaxNeighbors ?? 10,
enableResidualRecall: settings.recallEnableResidualRecall ?? false,
residualBasisMaxNodes: settings.recallResidualBasisMaxNodes ?? 24,
residualNmfTopics: settings.recallNmfTopics ?? 15,
residualNmfNoveltyThreshold: settings.recallNmfNoveltyThreshold ?? 0.4,
residualThreshold: settings.recallResidualThreshold ?? 0.3,
residualTopK: settings.recallResidualTopK ?? 5,
};
}
/**
* 召回管线:检索并注入记忆
*/
async function runRecall(options = {}) {
if (isRecalling) {
abortRecallStageWithReason("旧召回已取消,正在启动新的召回");
const settle = await waitForActiveRecallToSettle();
if (!settle.settled && isRecalling) {
setLastRecallStatus(
"召回忙",
"上一轮召回仍在清理,请稍后重试",
"warning",
{
syncRuntime: true,
},
);
return createRecallRunResult("skipped", {
reason: "上一轮召回仍在清理",
});
}
}
if (!currentGraph) {
return createRecallRunResult("skipped", {
reason: "当前无图谱",
});
}
const settings = getSettings();
if (!settings.enabled || !settings.recallEnabled) {
return createRecallRunResult("skipped", {
reason: "召回功能未启用",
});
}
if (!isGraphReadable()) {
const reason = getGraphMutationBlockReason("召回");
setLastRecallStatus("等待图谱加载", reason, "warning", {
syncRuntime: true,
});
return createRecallRunResult("skipped", {
reason,
});
}
if (isGraphMetadataWriteAllowed()) {
if (!(await recoverHistoryIfNeeded("pre-recall"))) {
return createRecallRunResult("skipped", {
reason: "历史恢复未就绪",
});
}
}
const context = getContext();
const chat = context.chat;
if (!chat || chat.length === 0) {
return createRecallRunResult("skipped", {
reason: "当前聊天为空",
});
}
const runId = ++recallRunSequence;
let recallPromise = null;
recallPromise = (async () => {
isRecalling = true;
const recallController = beginStageAbortController("recall");
const recallSignal = recallController.signal;
if (options.signal) {
if (options.signal.aborted) {
recallController.abort(
options.signal.reason || createAbortError("宿主已终止生成"),
);
} else {
options.signal.addEventListener(
"abort",
() =>
recallController.abort(
options.signal.reason || createAbortError("宿主已终止生成"),
),
{ once: true },
);
}
}
try {
await ensureVectorReadyIfNeeded("pre-recall", recallSignal);
const recentContextMessageLimit = clampInt(
settings.recallLlmContextMessages,
4,
0,
20,
);
const recallInput = resolveRecallInput(
chat,
recentContextMessageLimit,
options,
);
const userMessage = recallInput.userMessage;
const recentMessages = recallInput.recentMessages;
if (!userMessage) {
return createRecallRunResult("skipped", {
reason: "当前没有可用于召回的用户输入",
});
}
recallInput.hookName = options.hookName || "";
console.log("[ST-BME] 开始召回", {
source: recallInput.source,
sourceLabel: recallInput.sourceLabel,
hookName: recallInput.hookName,
userMessageLength: userMessage.length,
recentMessages: recentMessages.length,
runId,
});
setLastRecallStatus(
"召回中",
[
getRecallHookLabel(recallInput.hookName),
`来源 ${recallInput.sourceLabel}`,
`上下文 ${recentMessages.length}`,
`当前用户消息长度 ${userMessage.length}`,
]
.filter(Boolean)
.join(" · "),
"running",
{ syncRuntime: true },
);
if (recallInput.source === "send-intent") {
pendingRecallSendIntent = createRecallInputRecord();
}
const result = await retrieve({
graph: currentGraph,
userMessage,
recentMessages,
embeddingConfig: getEmbeddingConfig(),
schema: getSchema(),
signal: recallSignal,
settings,
onStreamProgress: ({ previewText, receivedChars }) => {
const preview =
previewText?.length > 60
? "…" + previewText.slice(-60)
: previewText || "";
setLastRecallStatus(
"AI 生成中",
`${preview} [${receivedChars}字]`,
"running",
{ syncRuntime: true, noticeMarquee: true },
);
},
options: {
topK: settings.recallTopK,
maxRecallNodes: settings.recallMaxNodes,
enableLLMRecall: settings.recallEnableLLM,
enableVectorPrefilter: settings.recallEnableVectorPrefilter,
enableGraphDiffusion: settings.recallEnableGraphDiffusion,
diffusionTopK: settings.recallDiffusionTopK,
llmCandidatePool: settings.recallLlmCandidatePool,
recallPrompt: undefined,
weights: {
graphWeight: settings.graphWeight,
vectorWeight: settings.vectorWeight,
importanceWeight: settings.importanceWeight,
},
// v2 options
enableVisibility: settings.enableVisibility ?? false,
visibilityFilter: context.name2 || null,
enableCrossRecall: settings.enableCrossRecall ?? false,
enableProbRecall: settings.enableProbRecall ?? false,
probRecallChance: settings.probRecallChance ?? 0.15,
enableMultiIntent: settings.recallEnableMultiIntent ?? true,
multiIntentMaxSegments: settings.recallMultiIntentMaxSegments ?? 4,
teleportAlpha: settings.recallTeleportAlpha ?? 0.15,
enableTemporalLinks: settings.recallEnableTemporalLinks ?? true,
temporalLinkStrength: settings.recallTemporalLinkStrength ?? 0.2,
enableDiversitySampling:
settings.recallEnableDiversitySampling ?? true,
dppCandidateMultiplier: settings.recallDppCandidateMultiplier ?? 3,
dppQualityWeight: settings.recallDppQualityWeight ?? 1.0,
enableCooccurrenceBoost:
settings.recallEnableCooccurrenceBoost ?? false,
cooccurrenceScale: settings.recallCooccurrenceScale ?? 0.1,
cooccurrenceMaxNeighbors:
settings.recallCooccurrenceMaxNeighbors ?? 10,
enableResidualRecall: settings.recallEnableResidualRecall ?? false,
residualBasisMaxNodes: settings.recallResidualBasisMaxNodes ?? 24,
residualNmfTopics: settings.recallNmfTopics ?? 15,
residualNmfNoveltyThreshold:
settings.recallNmfNoveltyThreshold ?? 0.4,
residualThreshold: settings.recallResidualThreshold ?? 0.3,
residualTopK: settings.recallResidualTopK ?? 5,
},
});
applyRecallInjection(settings, recallInput, recentMessages, result);
return createRecallRunResult("completed", {
reason: "召回完成",
selectedNodeIds: result.selectedNodeIds || [],
});
} catch (e) {
if (isAbortError(e)) {
setLastRecallStatus(
"召回已终止",
e?.message || "已手动终止当前召回",
"warning",
{
syncRuntime: true,
},
);
return createRecallRunResult("aborted", {
reason: e?.message || "召回已终止",
});
}
console.error("[ST-BME] 召回失败:", e);
const message = e?.message || String(e);
setLastRecallStatus("召回失败", message, "error", {
syncRuntime: true,
toastKind: "",
});
toastr.error(`召回失败: ${message}`);
return createRecallRunResult("failed", {
reason: message,
});
} finally {
finishStageAbortController("recall", recallController);
isRecalling = false;
if (activeRecallPromise === recallPromise) {
activeRecallPromise = null;
}
refreshPanelLiveState();
}
})();
activeRecallPromise = recallPromise;
return await recallPromise;
return await runRecallController(
{
abortRecallStageWithReason,
applyRecallInjection,
beginStageAbortController,
buildRecallRetrieveOptions,
clampInt,
console,
createAbortError,
createRecallInputRecord,
createRecallRunResult,
ensureVectorReadyIfNeeded,
finishStageAbortController,
getActiveRecallPromise: () => activeRecallPromise,
getContext,
getCurrentGraph: () => currentGraph,
getEmbeddingConfig,
getGraphMutationBlockReason,
getIsRecalling: () => isRecalling,
getRecallHookLabel,
getSchema,
getSettings,
isAbortError,
isGraphMetadataWriteAllowed,
isGraphReadable,
nextRecallRunSequence: () => ++recallRunSequence,
recoverHistoryIfNeeded,
refreshPanelLiveState,
resolveRecallInput,
retrieve,
setActiveRecallPromise: (value) => {
activeRecallPromise = value;
},
setIsRecalling: (value) => {
isRecalling = value;
},
setLastRecallStatus,
setPendingRecallSendIntent: (value) => {
pendingRecallSendIntent = value;
},
toastr,
waitForActiveRecallToSettle,
},
options,
);
}
// ==================== 事件钩子 ====================

View File

@@ -220,3 +220,197 @@ export function applyRecallInjectionController(
return { injectionText, retrievalMeta, llmMeta };
}
export async function runRecallController(runtime, options = {}) {
if (runtime.getIsRecalling()) {
runtime.abortRecallStageWithReason("旧召回已取消,正在启动新的召回");
const settle = await runtime.waitForActiveRecallToSettle();
if (!settle.settled && runtime.getIsRecalling()) {
runtime.setLastRecallStatus(
"召回忙",
"上一轮召回仍在清理,请稍后重试",
"warning",
{
syncRuntime: true,
},
);
return runtime.createRecallRunResult("skipped", {
reason: "上一轮召回仍在清理",
});
}
}
if (!runtime.getCurrentGraph()) {
return runtime.createRecallRunResult("skipped", {
reason: "当前无图谱",
});
}
const settings = runtime.getSettings();
if (!settings.enabled || !settings.recallEnabled) {
return runtime.createRecallRunResult("skipped", {
reason: "召回功能未启用",
});
}
if (!runtime.isGraphReadable()) {
const reason = runtime.getGraphMutationBlockReason("召回");
runtime.setLastRecallStatus("等待图谱加载", reason, "warning", {
syncRuntime: true,
});
return runtime.createRecallRunResult("skipped", {
reason,
});
}
if (runtime.isGraphMetadataWriteAllowed()) {
if (!(await runtime.recoverHistoryIfNeeded("pre-recall"))) {
return runtime.createRecallRunResult("skipped", {
reason: "历史恢复未就绪",
});
}
}
const context = runtime.getContext();
const chat = context.chat;
if (!chat || chat.length === 0) {
return runtime.createRecallRunResult("skipped", {
reason: "当前聊天为空",
});
}
const runId = runtime.nextRecallRunSequence();
let recallPromise = null;
recallPromise = (async () => {
runtime.setIsRecalling(true);
const recallController = runtime.beginStageAbortController("recall");
const recallSignal = recallController.signal;
if (options.signal) {
if (options.signal.aborted) {
recallController.abort(
options.signal.reason || runtime.createAbortError("宿主已终止生成"),
);
} else {
options.signal.addEventListener(
"abort",
() =>
recallController.abort(
options.signal.reason || runtime.createAbortError("宿主已终止生成"),
),
{ once: true },
);
}
}
try {
await runtime.ensureVectorReadyIfNeeded("pre-recall", recallSignal);
const recentContextMessageLimit = runtime.clampInt(
settings.recallLlmContextMessages,
4,
0,
20,
);
const recallInput = runtime.resolveRecallInput(
chat,
recentContextMessageLimit,
options,
);
const userMessage = recallInput.userMessage;
const recentMessages = recallInput.recentMessages;
if (!userMessage) {
return runtime.createRecallRunResult("skipped", {
reason: "当前没有可用于召回的用户输入",
});
}
recallInput.hookName = options.hookName || "";
runtime.console.log("[ST-BME] 开始召回", {
source: recallInput.source,
sourceLabel: recallInput.sourceLabel,
hookName: recallInput.hookName,
userMessageLength: userMessage.length,
recentMessages: recentMessages.length,
runId,
});
runtime.setLastRecallStatus(
"召回中",
[
runtime.getRecallHookLabel(recallInput.hookName),
`来源 ${recallInput.sourceLabel}`,
`上下文 ${recentMessages.length}`,
`当前用户消息长度 ${userMessage.length}`,
]
.filter(Boolean)
.join(" · "),
"running",
{ syncRuntime: true },
);
if (recallInput.source === "send-intent") {
runtime.setPendingRecallSendIntent(runtime.createRecallInputRecord());
}
const result = await runtime.retrieve({
graph: runtime.getCurrentGraph(),
userMessage,
recentMessages,
embeddingConfig: runtime.getEmbeddingConfig(),
schema: runtime.getSchema(),
signal: recallSignal,
settings,
onStreamProgress: ({ previewText, receivedChars }) => {
const preview =
previewText?.length > 60
? "…" + previewText.slice(-60)
: previewText || "";
runtime.setLastRecallStatus(
"AI 生成中",
`${preview} [${receivedChars}字]`,
"running",
{ syncRuntime: true, noticeMarquee: true },
);
},
options: runtime.buildRecallRetrieveOptions(settings, context),
});
runtime.applyRecallInjection(settings, recallInput, recentMessages, result);
return runtime.createRecallRunResult("completed", {
reason: "召回完成",
selectedNodeIds: result.selectedNodeIds || [],
});
} catch (e) {
if (runtime.isAbortError(e)) {
runtime.setLastRecallStatus(
"召回已终止",
e?.message || "已手动终止当前召回",
"warning",
{
syncRuntime: true,
},
);
return runtime.createRecallRunResult("aborted", {
reason: e?.message || "召回已终止",
});
}
runtime.console.error("[ST-BME] 召回失败:", e);
const message = e?.message || String(e);
runtime.setLastRecallStatus("召回失败", message, "error", {
syncRuntime: true,
toastKind: "",
});
runtime.toastr.error(`召回失败: ${message}`);
return runtime.createRecallRunResult("failed", {
reason: message,
});
} finally {
runtime.finishStageAbortController("recall", recallController);
runtime.setIsRecalling(false);
if (runtime.getActiveRecallPromise() === recallPromise) {
runtime.setActiveRecallPromise(null);
}
runtime.refreshPanelLiveState();
}
})();
runtime.setActiveRecallPromise(recallPromise);
return await recallPromise;
}