Add persistence and retrieval observability with native delta gating

This commit is contained in:
Youzini-afk
2026-04-21 20:32:03 +08:00
parent 5a8f563168
commit d2c3d1f5dd
8 changed files with 745 additions and 35 deletions

View File

@@ -63,6 +63,17 @@ function throwIfAborted(signal) {
}
}
function nowMs() {
if (typeof performance?.now === "function") {
return performance.now();
}
return Date.now();
}
function roundMs(value) {
return Math.round((Number(value) || 0) * 10) / 10;
}
export const BACKEND_DEFAULT_MODELS = {
openai: "text-embedding-3-small",
openrouter: "openai/text-embedding-3-small",
@@ -349,16 +360,38 @@ function getEligibleVectorNodes(graph, range = null) {
return nodes.filter((node) => buildNodeVectorText(node).length > 0);
}
function buildDesiredVectorEntries(graph, config, range = null) {
return getEligibleVectorNodes(graph, range).map((node) => {
const hash = buildNodeVectorHash(node, config);
function buildDesiredVectorEntries(graph, config, range = null, diagnostics = null) {
const modelScope = getVectorModelScope(config);
let textBuildMs = 0;
let hashBuildMs = 0;
const entries = getEligibleVectorNodes(graph, range).map((node) => {
const textStartedAt = diagnostics ? nowMs() : 0;
const text = buildNodeVectorText(node);
if (diagnostics) {
textBuildMs += nowMs() - textStartedAt;
}
const seqEnd = node?.seqRange?.[1] ?? node?.seq ?? 0;
const hashStartedAt = diagnostics ? nowMs() : 0;
const payload = [node?.id || "", text, String(seqEnd), modelScope].join("::");
const hash = stableHashString(payload);
if (diagnostics) {
hashBuildMs += nowMs() - hashStartedAt;
}
return {
nodeId: node.id,
hash,
text: buildNodeVectorText(node),
index: node?.seqRange?.[1] ?? node?.seq ?? 0,
text,
index: seqEnd,
};
});
if (diagnostics && typeof diagnostics === "object") {
diagnostics.textBuildMs = roundMs(textBuildMs);
diagnostics.hashBuildMs = roundMs(hashBuildMs);
diagnostics.entryCount = entries.length;
}
return entries;
}
function computeVectorStats(graph, desiredEntries) {
@@ -547,26 +580,54 @@ export async function syncGraphVectorIndex(
return {
insertedHashes: [],
stats: { total: 0, indexed: 0, stale: 0, pending: 0 },
timings: null,
};
}
throwIfAborted(signal);
const syncStartedAt = nowMs();
const syncMode = isBackendVectorConfig(config) ? "backend" : "direct";
const validation = validateVectorConfig(config);
if (!validation.valid) {
graph.vectorIndexState.lastWarning = validation.error;
graph.vectorIndexState.dirty = true;
return { insertedHashes: [], stats: graph.vectorIndexState.lastStats };
graph.vectorIndexState.lastTimings = {
mode: syncMode,
validationError: validation.error,
totalMs: roundMs(nowMs() - syncStartedAt),
updatedAt: Date.now(),
};
return {
insertedHashes: [],
stats: graph.vectorIndexState.lastStats,
timings: graph.vectorIndexState.lastTimings,
};
}
const state = graph.vectorIndexState;
const collectionId = buildVectorCollectionId(
chatId || graph?.historyState?.chatId,
);
const desiredEntries = buildDesiredVectorEntries(graph, config, range);
const desiredBuildDiagnostics = {};
const desiredBuildStartedAt = nowMs();
const desiredEntries = buildDesiredVectorEntries(
graph,
config,
range,
desiredBuildDiagnostics,
);
const desiredBuildMs = nowMs() - desiredBuildStartedAt;
const desiredByNodeId = new Map(
desiredEntries.map((entry) => [entry.nodeId, entry]),
);
const insertedHashes = [];
let backendPurgeMs = 0;
let backendDeleteMs = 0;
let backendInsertMs = 0;
let embedBatchMs = 0;
let deletedHashCount = 0;
let embeddingsRequested = 0;
const hasConcreteRange =
range && Number.isFinite(range.start) && Number.isFinite(range.end);
const rangedNodeIds = new Set(desiredEntries.map((entry) => entry.nodeId));
@@ -581,9 +642,13 @@ export async function syncGraphVectorIndex(
purge || state.dirty || scopeChanged || (force && !hasConcreteRange);
if (fullReset) {
const purgeStartedAt = nowMs();
await purgeVectorCollection(collectionId, signal);
backendPurgeMs += nowMs() - purgeStartedAt;
resetVectorMappings(graph, config, chatId);
const insertStartedAt = nowMs();
await insertVectorEntries(collectionId, config, desiredEntries, signal);
backendInsertMs += nowMs() - insertStartedAt;
for (const entry of desiredEntries) {
state.hashToNodeId[entry.hash] = entry.nodeId;
state.nodeToHash[entry.nodeId] = entry.hash;
@@ -623,8 +688,13 @@ export async function syncGraphVectorIndex(
entriesToInsert.push(entry);
}
deletedHashCount = hashesToDelete.length;
const deleteStartedAt = nowMs();
await deleteVectorHashes(collectionId, config, hashesToDelete, signal);
backendDeleteMs += nowMs() - deleteStartedAt;
const insertStartedAt = nowMs();
await insertVectorEntries(collectionId, config, entriesToInsert, signal);
backendInsertMs += nowMs() - insertStartedAt;
for (const entry of entriesToInsert) {
state.hashToNodeId[entry.hash] = entry.nodeId;
@@ -679,11 +749,14 @@ export async function syncGraphVectorIndex(
let directSyncHadFailures = false;
if (entriesToEmbed.length > 0) {
throwIfAborted(signal);
embeddingsRequested = entriesToEmbed.length;
const embedStartedAt = nowMs();
const embeddings = await embedBatch(
entriesToEmbed.map((entry) => entry.text),
config,
{ signal },
);
embedBatchMs += nowMs() - embedStartedAt;
for (let index = 0; index < entriesToEmbed.length; index++) {
const entry = entriesToEmbed[index];
@@ -718,14 +791,34 @@ export async function syncGraphVectorIndex(
state.lastWarning = "";
}
state.lastSyncAt = Date.now();
const statsBuildStartedAt = nowMs();
state.lastStats = computeVectorStats(
graph,
buildDesiredVectorEntries(graph, config),
);
const statsBuildMs = nowMs() - statsBuildStartedAt;
state.lastTimings = {
mode: syncMode,
desiredEntries: Number(desiredBuildDiagnostics.entryCount || desiredEntries.length),
desiredBuildMs: roundMs(desiredBuildMs),
textBuildMs: Number(desiredBuildDiagnostics.textBuildMs || 0),
hashBuildMs: Number(desiredBuildDiagnostics.hashBuildMs || 0),
backendPurgeMs: roundMs(backendPurgeMs),
backendDeleteMs: roundMs(backendDeleteMs),
backendInsertMs: roundMs(backendInsertMs),
embedBatchMs: roundMs(embedBatchMs),
statsBuildMs: roundMs(statsBuildMs),
deletedHashes: Math.max(0, Math.floor(deletedHashCount)),
insertedEntries: insertedHashes.length,
embeddingsRequested: Math.max(0, Math.floor(embeddingsRequested)),
totalMs: roundMs(nowMs() - syncStartedAt),
updatedAt: Date.now(),
};
return {
insertedHashes,
stats: state.lastStats,
timings: state.lastTimings,
};
}
@@ -743,14 +836,52 @@ export async function findSimilarNodesByText(
const candidateNodes = Array.isArray(candidates)
? candidates
: getEligibleVectorNodes(graph);
const searchStartedAt = nowMs();
const mode = isDirectVectorConfig(config) ? "direct" : "backend";
const recordSearchTimings = (patch = {}) => {
const state = graph?.vectorIndexState;
if (!state || typeof state !== "object" || Array.isArray(state)) return;
state.lastSearchTimings = {
...(state.lastSearchTimings &&
typeof state.lastSearchTimings === "object" &&
!Array.isArray(state.lastSearchTimings)
? state.lastSearchTimings
: {}),
mode,
queryLength: String(text || "").length,
candidateCount: candidateNodes.length,
topK: Math.max(1, Math.floor(Number(topK) || 1)),
...patch,
totalMs: roundMs(nowMs() - searchStartedAt),
updatedAt: Date.now(),
};
};
if (candidateNodes.length === 0) return [];
if (candidateNodes.length === 0) {
recordSearchTimings({
success: true,
reason: "no-candidates",
resultCount: 0,
});
return [];
}
if (isDirectVectorConfig(config)) {
const queryEmbedStartedAt = nowMs();
const queryVec = await embedText(text, config, { signal });
if (!queryVec) return [];
const queryEmbedMs = nowMs() - queryEmbedStartedAt;
if (!queryVec) {
recordSearchTimings({
success: false,
reason: "direct-query-embed-empty",
queryEmbedMs: roundMs(queryEmbedMs),
resultCount: 0,
});
return [];
}
return searchSimilar(
const localSearchStartedAt = nowMs();
const results = searchSimilar(
queryVec,
candidateNodes
.filter(
@@ -762,12 +893,29 @@ export async function findSimilarNodesByText(
})),
topK,
);
recordSearchTimings({
success: true,
reason: "ok",
queryEmbedMs: roundMs(queryEmbedMs),
searchMs: roundMs(nowMs() - localSearchStartedAt),
resultCount: results.length,
});
return results;
}
const validation = validateVectorConfig(config);
if (!validation.valid) return [];
if (!validation.valid) {
recordSearchTimings({
success: false,
reason: "vector-config-invalid",
error: validation.error,
resultCount: 0,
});
return [];
}
try {
const requestStartedAt = nowMs();
const response = await fetchWithTimeout(
"/api/vector/query",
{
@@ -784,6 +932,7 @@ export async function findSimilarNodesByText(
},
getConfiguredTimeoutMs(config),
);
const requestMs = nowMs() - requestStartedAt;
if (!response.ok) {
const errorText = await response.text().catch(() => response.statusText);
@@ -795,23 +944,47 @@ export async function findSimilarNodesByText(
"backend-query-failed",
`后端向量查询失败(${message}),已标记待重建`,
);
recordSearchTimings({
success: false,
reason: "backend-query-http-failed",
statusCode: Number(response.status || 0),
requestMs: roundMs(requestMs),
error: message,
resultCount: 0,
});
return [];
}
const parseStartedAt = nowMs();
const data = await response.json().catch(() => ({ hashes: [] }));
const parseMs = nowMs() - parseStartedAt;
const hashes = Array.isArray(data?.hashes) ? data.hashes : [];
const nodeIdByHash = graph.vectorIndexState?.hashToNodeId || {};
const allowedIds = new Set(candidateNodes.map((node) => node.id));
return hashes
const results = hashes
.map((hash, index) => ({
nodeId: nodeIdByHash[hash],
score: Math.max(0.01, 1 - index / Math.max(1, hashes.length)),
}))
.filter((entry) => entry.nodeId && allowedIds.has(entry.nodeId))
.slice(0, topK);
recordSearchTimings({
success: true,
reason: "ok",
requestMs: roundMs(requestMs),
parseMs: roundMs(parseMs),
resultCount: results.length,
hashCount: hashes.length,
});
return results;
} catch (error) {
if (isAbortError(error)) {
recordSearchTimings({
success: false,
reason: "aborted",
error: error?.message || String(error),
});
throw error;
}
const message = error?.message || String(error) || "后端向量查询失败";
@@ -821,6 +994,11 @@ export async function findSimilarNodesByText(
"backend-query-failed",
`后端向量查询失败(${message}),已标记待重建`,
);
recordSearchTimings({
success: false,
reason: "backend-query-exception",
error: message,
});
throw error;
}
}