feat: harden Luker sidecar persistence flow

This commit is contained in:
Youzini-afk
2026-04-15 13:45:57 +08:00
parent f898caf69c
commit 360dfe3f19
11 changed files with 2761 additions and 62 deletions

View File

@@ -14,6 +14,13 @@ export const GRAPH_COMMIT_MARKER_KEY = "st_bme_commit_marker";
export const GRAPH_CHAT_STATE_NAMESPACE = `${MODULE_NAME}_graph_state`;
export const GRAPH_CHAT_STATE_VERSION = 1;
export const GRAPH_CHAT_STATE_MAX_OPERATIONS = 4000;
export const LUKER_GRAPH_MANIFEST_NAMESPACE = `${MODULE_NAME}_graph_manifest`;
export const LUKER_GRAPH_JOURNAL_NAMESPACE = `${MODULE_NAME}_graph_journal`;
export const LUKER_GRAPH_CHECKPOINT_NAMESPACE = `${MODULE_NAME}_graph_checkpoint`;
export const LUKER_GRAPH_SIDECAR_V2_FORMAT = 2;
export const LUKER_GRAPH_JOURNAL_COMPACTION_DEPTH = 32;
export const LUKER_GRAPH_JOURNAL_COMPACTION_BYTES = 2 * 1024 * 1024;
export const LUKER_GRAPH_JOURNAL_COMPACTION_REVISION_GAP = 64;
export const GRAPH_PERSISTENCE_META_KEY = "__stBmePersistence";
export const GRAPH_LOAD_STATES = Object.freeze({
NO_CHAT: "no-chat",
@@ -385,6 +392,689 @@ export function canUseGraphChatState(context = null) {
);
}
function canBatchReadGraphChatState(context = null) {
return (
!!context &&
typeof context.getChatStateBatch === "function"
);
}
function normalizeGraphCountSummary(value = {}) {
const nodeCount = Number(value?.nodeCount ?? value?.nodes);
const edgeCount = Number(value?.edgeCount ?? value?.edges);
const archivedCount = Number(value?.archivedCount ?? value?.archivedNodes);
const tombstoneCount = Number(value?.tombstoneCount ?? value?.tombstones);
return {
nodeCount: Number.isFinite(nodeCount) && nodeCount >= 0 ? Math.floor(nodeCount) : 0,
edgeCount: Number.isFinite(edgeCount) && edgeCount >= 0 ? Math.floor(edgeCount) : 0,
archivedCount:
Number.isFinite(archivedCount) && archivedCount >= 0
? Math.floor(archivedCount)
: 0,
tombstoneCount:
Number.isFinite(tombstoneCount) && tombstoneCount >= 0
? Math.floor(tombstoneCount)
: 0,
};
}
function normalizeGraphCountSummaryFromGraph(graph = null) {
const stats = graph ? getGraphStats(graph) : null;
return normalizeGraphCountSummary({
nodeCount: Number(stats?.activeNodes || 0),
edgeCount: Number(stats?.totalEdges || 0),
archivedCount: Number(stats?.archivedNodes || 0),
tombstoneCount: Number(stats?.tombstones || 0),
});
}
function clonePlainObjectArray(value) {
return Array.isArray(value)
? value
.filter((item) => item && typeof item === "object" && !Array.isArray(item))
.map((item) => cloneRuntimeDebugValue(item, item))
: [];
}
function cloneStringArray(value) {
return Array.isArray(value)
? value.map((item) => String(item || "").trim()).filter(Boolean)
: [];
}
function normalizeChatStatePersistDelta(delta = null) {
if (!delta || typeof delta !== "object" || Array.isArray(delta)) {
return {
upsertNodes: [],
upsertEdges: [],
deleteNodeIds: [],
deleteEdgeIds: [],
tombstones: [],
runtimeMetaPatch: {},
countDelta: null,
};
}
const runtimeMetaPatch =
delta.runtimeMetaPatch &&
typeof delta.runtimeMetaPatch === "object" &&
!Array.isArray(delta.runtimeMetaPatch)
? cloneRuntimeDebugValue(delta.runtimeMetaPatch, {})
: {};
const countDelta =
delta.countDelta &&
typeof delta.countDelta === "object" &&
!Array.isArray(delta.countDelta)
? cloneRuntimeDebugValue(delta.countDelta, {})
: null;
return {
upsertNodes: clonePlainObjectArray(delta.upsertNodes),
upsertEdges: clonePlainObjectArray(delta.upsertEdges),
deleteNodeIds: cloneStringArray(delta.deleteNodeIds),
deleteEdgeIds: cloneStringArray(delta.deleteEdgeIds),
tombstones: clonePlainObjectArray(delta.tombstones),
runtimeMetaPatch,
countDelta,
};
}
function stringifyJsonByteLength(value) {
try {
return JSON.stringify(value).length;
} catch {
return 0;
}
}
export function normalizeLukerGraphJournalEntry(entry = null) {
if (!entry || typeof entry !== "object" || Array.isArray(entry)) {
return null;
}
const revision = Number(entry.revision);
const reason = String(entry.reason || "");
const persistedAt = String(entry.persistedAt || entry.updatedAt || "");
const storageTier = String(entry.storageTier || "luker-chat-state");
const chatId = normalizeIdentityValue(entry.chatId);
const integrity = normalizeIdentityValue(entry.integrity);
const persistDelta = normalizeChatStatePersistDelta(entry.persistDelta);
const hasDeltaPayload =
persistDelta.upsertNodes.length > 0 ||
persistDelta.upsertEdges.length > 0 ||
persistDelta.deleteNodeIds.length > 0 ||
persistDelta.deleteEdgeIds.length > 0 ||
persistDelta.tombstones.length > 0 ||
Object.keys(persistDelta.runtimeMetaPatch).length > 0;
if (!Number.isFinite(revision) || revision <= 0 || !hasDeltaPayload) {
return null;
}
return {
revision: Math.floor(revision),
reason,
persistedAt,
storageTier,
chatId,
integrity,
persistDelta,
countDelta:
persistDelta.countDelta &&
typeof persistDelta.countDelta === "object" &&
!Array.isArray(persistDelta.countDelta)
? cloneRuntimeDebugValue(persistDelta.countDelta, {})
: null,
byteLength: stringifyJsonByteLength({
revision: Math.floor(revision),
reason,
persistedAt,
storageTier,
chatId,
integrity,
persistDelta,
}),
};
}
export function normalizeLukerGraphJournalV2(payload = null) {
if (!payload || typeof payload !== "object" || Array.isArray(payload)) {
return null;
}
const formatVersion = Number(payload.formatVersion || payload.version);
const chatId = normalizeIdentityValue(payload.chatId);
const integrity = normalizeIdentityValue(payload.integrity);
const entries = Array.isArray(payload.entries)
? payload.entries
.map((entry) => normalizeLukerGraphJournalEntry(entry))
.filter(Boolean)
.sort((left, right) => left.revision - right.revision)
: [];
const latestEntry = entries.length > 0 ? entries[entries.length - 1] : null;
const headRevision = Number(payload.headRevision || latestEntry?.revision || 0);
return {
formatVersion:
Number.isFinite(formatVersion) && formatVersion > 0
? Math.floor(formatVersion)
: LUKER_GRAPH_SIDECAR_V2_FORMAT,
chatId,
integrity,
headRevision:
Number.isFinite(headRevision) && headRevision >= 0
? Math.floor(headRevision)
: Number(latestEntry?.revision || 0),
updatedAt: String(payload.updatedAt || latestEntry?.persistedAt || ""),
entries,
entryCount: entries.length,
totalBytes: entries.reduce(
(sum, entry) => sum + Number(entry?.byteLength || 0),
0,
),
};
}
export function buildLukerGraphJournalEntry(
delta = null,
{
revision = 0,
reason = "",
storageTier = "luker-chat-state",
chatId = "",
integrity = "",
persistedAt = "",
} = {},
) {
return normalizeLukerGraphJournalEntry({
revision,
reason,
persistedAt: String(persistedAt || new Date().toISOString()),
storageTier,
chatId,
integrity,
persistDelta: normalizeChatStatePersistDelta(delta),
});
}
export function buildLukerGraphJournalV2(entries = [], metadata = {}) {
return normalizeLukerGraphJournalV2({
formatVersion: LUKER_GRAPH_SIDECAR_V2_FORMAT,
chatId: metadata.chatId,
integrity: metadata.integrity,
headRevision: metadata.headRevision,
updatedAt: metadata.updatedAt || new Date().toISOString(),
entries: Array.isArray(entries) ? entries : [],
});
}
export function normalizeLukerGraphCheckpointV2(payload = null) {
if (!payload || typeof payload !== "object" || Array.isArray(payload)) {
return null;
}
const formatVersion = Number(payload.formatVersion || payload.version);
const revision = Number(payload.revision);
const serializedGraph = String(payload.serializedGraph || "");
const chatId = normalizeIdentityValue(payload.chatId);
const integrity = normalizeIdentityValue(payload.integrity);
const counts = normalizeGraphCountSummary(payload.counts);
if (!serializedGraph) {
return null;
}
return {
formatVersion:
Number.isFinite(formatVersion) && formatVersion > 0
? Math.floor(formatVersion)
: LUKER_GRAPH_SIDECAR_V2_FORMAT,
revision: Number.isFinite(revision) && revision > 0 ? Math.floor(revision) : 0,
serializedGraph,
chatId,
integrity,
counts,
persistedAt: String(payload.persistedAt || payload.updatedAt || ""),
updatedAt: String(payload.updatedAt || payload.persistedAt || ""),
reason: String(payload.reason || ""),
storageTier: String(payload.storageTier || "luker-chat-state"),
};
}
export function buildLukerGraphCheckpointV2(
graph,
{
revision = 0,
chatId = "",
integrity = "",
reason = "",
storageTier = "luker-chat-state",
persistedAt = "",
} = {},
) {
if (!graph) return null;
return normalizeLukerGraphCheckpointV2({
formatVersion: LUKER_GRAPH_SIDECAR_V2_FORMAT,
revision,
chatId,
integrity,
reason,
storageTier,
counts: normalizeGraphCountSummaryFromGraph(graph),
persistedAt: String(persistedAt || new Date().toISOString()),
updatedAt: String(persistedAt || new Date().toISOString()),
serializedGraph: serializeGraph(graph),
});
}
export function normalizeLukerGraphManifestV2(payload = null) {
if (!payload || typeof payload !== "object" || Array.isArray(payload)) {
return null;
}
const formatVersion = Number(payload.formatVersion || payload.version);
const baseRevision = Number(payload.baseRevision);
const headRevision = Number(payload.headRevision);
const checkpointRevision = Number(payload.checkpointRevision);
const lastCompactedRevision = Number(payload.lastCompactedRevision);
const lastProcessedAssistantFloor = Number(payload.lastProcessedAssistantFloor);
const extractionCount = Number(payload.extractionCount);
const journalDepth = Number(payload.journalDepth);
const journalBytes = Number(payload.journalBytes);
const chatId = normalizeIdentityValue(payload.chatId);
const integrity = normalizeIdentityValue(payload.integrity);
const counts = normalizeGraphCountSummary(payload.counts);
return {
formatVersion:
Number.isFinite(formatVersion) && formatVersion > 0
? Math.floor(formatVersion)
: LUKER_GRAPH_SIDECAR_V2_FORMAT,
baseRevision: Number.isFinite(baseRevision) && baseRevision >= 0 ? Math.floor(baseRevision) : 0,
headRevision: Number.isFinite(headRevision) && headRevision >= 0 ? Math.floor(headRevision) : 0,
checkpointRevision:
Number.isFinite(checkpointRevision) && checkpointRevision >= 0
? Math.floor(checkpointRevision)
: 0,
lastCompactedRevision:
Number.isFinite(lastCompactedRevision) && lastCompactedRevision >= 0
? Math.floor(lastCompactedRevision)
: 0,
journalDepth:
Number.isFinite(journalDepth) && journalDepth >= 0 ? Math.floor(journalDepth) : 0,
journalBytes:
Number.isFinite(journalBytes) && journalBytes >= 0 ? Math.floor(journalBytes) : 0,
lastProcessedAssistantFloor:
Number.isFinite(lastProcessedAssistantFloor)
? Math.floor(lastProcessedAssistantFloor)
: -1,
extractionCount:
Number.isFinite(extractionCount) && extractionCount >= 0
? Math.floor(extractionCount)
: 0,
chatId,
integrity,
counts,
storageTier: String(payload.storageTier || "luker-chat-state"),
accepted: payload.accepted === true,
persistedAt: String(payload.persistedAt || payload.updatedAt || ""),
updatedAt: String(payload.updatedAt || payload.persistedAt || ""),
reason: String(payload.reason || ""),
compactionState:
payload.compactionState && typeof payload.compactionState === "object"
? cloneRuntimeDebugValue(payload.compactionState, {})
: {
state: "idle",
lastAt: 0,
lastReason: "",
error: "",
},
};
}
export function buildLukerGraphManifestV2(
graph,
{
baseRevision = 0,
headRevision = 0,
checkpointRevision = 0,
lastCompactedRevision = 0,
journalDepth = 0,
journalBytes = 0,
chatId = "",
integrity = "",
reason = "",
storageTier = "luker-chat-state",
accepted = true,
persistedAt = "",
updatedAt = "",
lastProcessedAssistantFloor = null,
extractionCount = null,
compactionState = null,
} = {},
) {
const stats = graph ? getGraphStats(graph) : null;
const historyState = graph?.historyState || {};
const nextCounts =
graph != null
? normalizeGraphCountSummaryFromGraph(graph)
: normalizeGraphCountSummary();
return normalizeLukerGraphManifestV2({
formatVersion: LUKER_GRAPH_SIDECAR_V2_FORMAT,
baseRevision,
headRevision,
checkpointRevision,
lastCompactedRevision,
journalDepth,
journalBytes,
chatId,
integrity,
counts: nextCounts,
storageTier,
accepted,
persistedAt: String(persistedAt || new Date().toISOString()),
updatedAt: String(updatedAt || persistedAt || new Date().toISOString()),
reason,
lastProcessedAssistantFloor:
lastProcessedAssistantFloor != null
? lastProcessedAssistantFloor
: Number.isFinite(Number(historyState.lastProcessedAssistantFloor))
? Number(historyState.lastProcessedAssistantFloor)
: Number.isFinite(Number(stats?.lastProcessedSeq))
? Number(stats.lastProcessedSeq)
: -1,
extractionCount:
extractionCount != null
? extractionCount
: Number.isFinite(Number(historyState.extractionCount))
? Number(historyState.extractionCount)
: 0,
compactionState,
});
}
async function readGraphChatStateNamespaces(
context = null,
namespaces = [],
) {
if (!canUseGraphChatState(context) || !Array.isArray(namespaces) || namespaces.length === 0) {
return new Map();
}
try {
if (canBatchReadGraphChatState(context)) {
const batch = await context.getChatStateBatch(namespaces);
if (batch instanceof Map) {
return batch;
}
if (batch && typeof batch === "object") {
return new Map(Object.entries(batch));
}
}
} catch (error) {
console.warn("[ST-BME] 批量读取聊天侧车失败,回退逐项读取:", error);
}
const result = new Map();
for (const namespace of namespaces) {
try {
result.set(namespace, await context.getChatState(namespace));
} catch {
result.set(namespace, null);
}
}
return result;
}
async function writeGraphChatStatePayload(
context = null,
namespace = "",
payload = null,
{ maxOperations = GRAPH_CHAT_STATE_MAX_OPERATIONS, asyncDiff = false } = {},
) {
if (!canUseGraphChatState(context) || !namespace || !payload) {
return {
ok: false,
updated: false,
reason: "chat-state-unavailable",
payload: null,
};
}
try {
const result = await context.updateChatState(
namespace,
() => cloneRuntimeDebugValue(payload, payload),
{
maxOperations,
asyncDiff,
maxRetries: 1,
},
);
return {
ok: result?.ok === true,
updated: result?.updated !== false,
reason:
result?.ok === true
? result?.updated === false
? "chat-state-noop"
: "chat-state-saved"
: "chat-state-save-failed",
payload,
};
} catch (error) {
console.warn(`[ST-BME] 写入聊天侧车 ${namespace} 失败:`, error);
return {
ok: false,
updated: false,
reason: "chat-state-save-failed",
error,
payload,
};
}
}
export async function readLukerGraphSidecarV2(
context = null,
{
manifestNamespace = LUKER_GRAPH_MANIFEST_NAMESPACE,
journalNamespace = LUKER_GRAPH_JOURNAL_NAMESPACE,
checkpointNamespace = LUKER_GRAPH_CHECKPOINT_NAMESPACE,
} = {},
) {
if (!canUseGraphChatState(context)) {
return {
manifest: null,
journal: null,
checkpoint: null,
};
}
const payloads = await readGraphChatStateNamespaces(context, [
manifestNamespace,
journalNamespace,
checkpointNamespace,
]);
return {
manifest: normalizeLukerGraphManifestV2(payloads.get(manifestNamespace) || null),
journal: normalizeLukerGraphJournalV2(payloads.get(journalNamespace) || null),
checkpoint: normalizeLukerGraphCheckpointV2(payloads.get(checkpointNamespace) || null),
};
}
export async function writeLukerGraphManifestV2(
context = null,
manifest = null,
{
namespace = LUKER_GRAPH_MANIFEST_NAMESPACE,
maxOperations = 512,
} = {},
) {
const normalizedManifest = normalizeLukerGraphManifestV2(manifest);
if (!normalizedManifest) {
return {
ok: false,
updated: false,
reason: "chat-state-build-failed",
manifest: null,
};
}
const result = await writeGraphChatStatePayload(context, namespace, normalizedManifest, {
maxOperations,
asyncDiff: false,
});
return {
...result,
manifest: normalizedManifest,
};
}
export async function appendLukerGraphJournalEntryV2(
context = null,
entry = null,
{
namespace = LUKER_GRAPH_JOURNAL_NAMESPACE,
chatId = "",
integrity = "",
maxOperations = GRAPH_CHAT_STATE_MAX_OPERATIONS,
} = {},
) {
const normalizedEntry = normalizeLukerGraphJournalEntry(entry);
if (!normalizedEntry || !canUseGraphChatState(context)) {
return {
ok: false,
updated: false,
reason: "chat-state-build-failed",
journal: null,
entry: null,
};
}
try {
const result = await context.updateChatState(
namespace,
(current = {}) => {
const normalizedCurrent =
normalizeLukerGraphJournalV2(current) ||
buildLukerGraphJournalV2([], {
chatId,
integrity,
headRevision: 0,
});
const existingEntries = Array.isArray(normalizedCurrent.entries)
? normalizedCurrent.entries.filter(
(candidate) => Number(candidate?.revision || 0) !== normalizedEntry.revision,
)
: [];
const nextEntries = [...existingEntries, normalizedEntry].sort(
(left, right) => left.revision - right.revision,
);
const nextJournal = buildLukerGraphJournalV2(nextEntries, {
chatId:
normalizeIdentityValue(chatId) ||
normalizedCurrent.chatId ||
normalizedEntry.chatId,
integrity:
normalizeIdentityValue(integrity) ||
normalizedCurrent.integrity ||
normalizedEntry.integrity,
headRevision: normalizedEntry.revision,
updatedAt: normalizedEntry.persistedAt,
});
return nextJournal;
},
{
maxOperations,
asyncDiff: false,
maxRetries: 1,
},
);
const journal = await readGraphChatStateNamespaces(context, [namespace]);
return {
ok: result?.ok === true,
updated: result?.updated !== false,
reason:
result?.ok === true
? result?.updated === false
? "chat-state-noop"
: "chat-state-saved"
: "chat-state-save-failed",
journal: normalizeLukerGraphJournalV2(journal.get(namespace) || null),
entry: normalizedEntry,
};
} catch (error) {
console.warn("[ST-BME] 追加 Luker graph journal 失败:", error);
return {
ok: false,
updated: false,
reason: "chat-state-save-failed",
error,
journal: null,
entry: normalizedEntry,
};
}
}
export async function replaceLukerGraphJournalV2(
context = null,
journal = null,
{
namespace = LUKER_GRAPH_JOURNAL_NAMESPACE,
maxOperations = GRAPH_CHAT_STATE_MAX_OPERATIONS,
} = {},
) {
const normalizedJournal = normalizeLukerGraphJournalV2(journal);
if (!normalizedJournal) {
return {
ok: false,
updated: false,
reason: "chat-state-build-failed",
journal: null,
};
}
const result = await writeGraphChatStatePayload(context, namespace, normalizedJournal, {
maxOperations,
asyncDiff: false,
});
return {
...result,
journal: normalizedJournal,
};
}
export async function writeLukerGraphCheckpointV2(
context = null,
checkpoint = null,
{
namespace = LUKER_GRAPH_CHECKPOINT_NAMESPACE,
maxOperations = GRAPH_CHAT_STATE_MAX_OPERATIONS,
} = {},
) {
const normalizedCheckpoint = normalizeLukerGraphCheckpointV2(checkpoint);
if (!normalizedCheckpoint) {
return {
ok: false,
updated: false,
reason: "chat-state-build-failed",
checkpoint: null,
};
}
const result = await writeGraphChatStatePayload(context, namespace, normalizedCheckpoint, {
maxOperations,
asyncDiff: false,
});
return {
...result,
checkpoint: normalizedCheckpoint,
};
}
export function normalizeGraphChatStateSnapshot(snapshot = null) {
if (!snapshot || typeof snapshot !== "object" || Array.isArray(snapshot)) {
return null;