feat: add maintenance concurrency modes

This commit is contained in:
Youzini-afk
2026-04-29 14:24:35 +08:00
parent 9c84d71889
commit 899774d636
15 changed files with 503 additions and 59 deletions

View File

@@ -10,6 +10,7 @@ import {
searchAuthorityTriviumNodes,
} from "../vector/authority-vector-primary-adapter.js";
import { embedText } from "../vector/embedding.js";
import { runLimited } from "../runtime/concurrency.js";
function nowMs() {
if (typeof performance?.now === "function") {
@@ -148,6 +149,7 @@ export async function resolveAuthorityRecallCandidates({
fallbackReason: "",
timings: {
total: 0,
embed: 0,
filter: 0,
search: 0,
neighbors: 0,
@@ -245,13 +247,20 @@ export async function resolveAuthorityRecallCandidates({
diagnostics.timings.filter = roundMs(nowMs() - filterStartedAt);
const searchScores = new Map();
let embedMs = 0;
const searchStartedAt = nowMs();
for (const queryEntry of queryPlan.queries) {
try {
const queryVec = await embedText(queryEntry.text, embeddingConfig, { signal, isQuery: true });
const searchResultsByQuery = await runLimited(
queryPlan.queries,
async (queryEntry) => {
const embedStartedAt = nowMs();
const queryVec = await embedText(queryEntry.text, embeddingConfig, {
signal,
isQuery: true,
});
embedMs += nowMs() - embedStartedAt;
if (!queryVec) {
diagnostics.fallbackReason = diagnostics.fallbackReason || "authority-candidate-query-embed-empty";
continue;
diagnostics.fallbackReason ||= "authority-candidate-query-embed-empty";
return [];
}
const searchResults = await searchAuthorityTriviumNodes(
graph,
@@ -267,16 +276,38 @@ export async function resolveAuthorityRecallCandidates({
signal,
},
);
for (const result of searchResults) {
const nodeId = normalizeRecordId(result?.nodeId);
if (!nodeId || !allowedIds.has(nodeId)) continue;
const weightedScore = Math.max(0.001, Number(result?.score || 0) || 0) * queryEntry.weight;
const previous = Number(searchScores.get(nodeId) || 0) || 0;
if (weightedScore > previous) {
searchScores.set(nodeId, weightedScore);
}
return searchResults.map((result) => ({ ...result, queryWeight: queryEntry.weight }));
},
{
concurrency: Math.max(1, Math.floor(Number(options.queryConcurrency || 1)) || 1),
signal,
failFast: false,
},
);
diagnostics.timings.embed = roundMs(embedMs);
for (const searchResults of searchResultsByQuery) {
if (searchResults?.error) {
diagnostics.fallbackReason ||= "authority-candidate-search-failed";
if (embeddingConfig?.failOpen === false) {
throw searchResults.error;
}
} catch (error) {
continue;
}
for (const result of searchResults || []) {
const nodeId = normalizeRecordId(result?.nodeId);
if (!nodeId || !allowedIds.has(nodeId)) continue;
const weightedScore =
Math.max(0.001, Number(result?.score || 0) || 0) *
Math.max(0.05, Number(result?.queryWeight || 0) || 0.05);
const previous = Number(searchScores.get(nodeId) || 0) || 0;
if (weightedScore > previous) {
searchScores.set(nodeId, weightedScore);
}
}
}
for (const item of searchResultsByQuery) {
if (item?.error) {
const error = item.error;
if (isAbortError(error)) throw error;
diagnostics.fallbackReason ||= "authority-candidate-search-failed";
if (embeddingConfig?.failOpen === false) {

View File

@@ -1295,6 +1295,7 @@ export async function retrieve({
limit: options.authorityCandidateLimit,
neighborLimit: options.authorityCandidateNeighborLimit,
minimumUsedCandidateCount: options.authorityCandidateMinCount,
queryConcurrency: options.authorityCandidateQueryConcurrency || options.vectorQueryConcurrency,
enableContextQueryBlend,
contextAssistantWeight,
contextPreviousUserWeight,
@@ -1427,6 +1428,7 @@ export async function retrieve({
enableLexicalBoost,
lexicalWeight,
weights,
vectorQueryConcurrency: options.vectorQueryConcurrency,
activeNodes: rankingActiveNodes,
},
});

View File

@@ -3,6 +3,7 @@ import { findSimilarNodesByText, validateVectorConfig } from "../vector/vector-i
import { hybridScore } from "./dynamics.js";
import { diffuseAndRank } from "./diffusion.js";
import { mergeVectorResults, splitIntentSegments } from "./retrieval-enhancer.js";
import { runLimited } from "../runtime/concurrency.js";
function nowMs() {
if (typeof performance?.now === "function") {
@@ -607,9 +608,22 @@ export async function rankNodesForTaskContext({
let vectorResults = [];
const vectorStartedAt = nowMs();
if (enableVectorPrefilter && vectorValidation.valid) {
const groups = [];
const vectorTasks = [];
for (const part of queryPlan.plan) {
for (const queryText of part.queries) {
vectorTasks.push({
queryText,
weight: part.weight || 1,
});
}
}
const vectorQueryConcurrency = clampPositiveInt(
options.vectorQueryConcurrency,
1,
);
const groups = await runLimited(
vectorTasks,
async ({ queryText, weight }) => {
const results = await vectorPreFilter(
graph,
queryText,
@@ -618,9 +632,13 @@ export async function rankNodesForTaskContext({
topK,
signal,
);
groups.push(scaleVectorResults(results, part.weight || 1));
}
}
return scaleVectorResults(results, weight);
},
{
concurrency: vectorQueryConcurrency,
signal,
},
);
const merged = mergeVectorResults(groups, Math.max(topK * 2, 24));
diagnostics.vectorHits = merged.rawHitCount;