Integrate Authority Blob storage

This commit is contained in:
Youzini-afk
2026-04-28 12:52:52 +08:00
parent 322804a12a
commit d7cbbb20c1
5 changed files with 1531 additions and 194 deletions

235
index.js
View File

@@ -373,6 +373,10 @@ import {
createAuthorityJobAdapter,
normalizeAuthorityJobConfig,
} from "./maintenance/authority-job-adapter.js";
import {
createAuthorityBlobAdapter,
normalizeAuthorityBlobConfig,
} from "./maintenance/authority-blob-adapter.js";
export { DEFAULT_TRIGGER_KEYWORDS, getSmartTriggerDecision };
@@ -1505,6 +1509,7 @@ function buildAuthorityPersistenceStatePatch(settings = getSettings()) {
authorityStoragePrimaryReady: Boolean(capability.storagePrimaryReady),
authorityTriviumPrimaryReady: Boolean(capability.triviumPrimaryReady),
authorityJobsReady: Boolean(capability.jobsReady),
authorityBlobReady: Boolean(capability.blobReady),
authorityBrowserCacheMode: String(browserState.mode || "minimal"),
authorityOfflineQueueBytes: Number(browserState.offlineQueueBytes || 0),
authorityOfflineQueueItems: Number(browserState.offlineQueueItems || 0),
@@ -1965,6 +1970,206 @@ function recordAuthorityJobSnapshot(job = null, options = {}) {
});
}
function recordAuthorityBlobSnapshot(event = {}) {
const normalizedEvent =
event && typeof event === "object" && !Array.isArray(event) ? event : {};
updateGraphPersistenceState({
authorityBlobState: normalizedEvent.ok === false ? "error" : "active",
authorityLastBlobEvent: cloneRuntimeDebugValue(normalizedEvent, null),
authorityLastBlobAction: String(normalizedEvent.action || ""),
authorityLastBlobBackend: String(normalizedEvent.backend || ""),
authorityLastBlobPath: String(normalizedEvent.path || ""),
authorityLastBlobReason: String(normalizedEvent.reason || ""),
authorityLastBlobError: String(normalizedEvent.error || ""),
authorityLastBlobUpdatedAt: String(
normalizedEvent.updatedAt || new Date().toISOString(),
),
});
}
function buildAuthorityBlobFileHash(input = "") {
let hash = 2166136261;
const text = String(input ?? "");
for (let index = 0; index < text.length; index += 1) {
hash ^= text.charCodeAt(index);
hash = Math.imul(hash, 16777619);
}
return (hash >>> 0).toString(36);
}
function buildAuthorityBlobSafeSlug(input = "", fallback = "unknown") {
const normalized = String(input || fallback)
.trim()
.replace(/[^A-Za-z0-9._-]+/g, "_")
.replace(/_+/g, "_")
.replace(/^[_.-]+|[_.-]+$/g, "")
.slice(0, 96);
return normalized || fallback;
}
function shouldUseAuthorityBlobCheckpoint() {
const settings = getSettings();
const authoritySettings = normalizeAuthoritySettings(settings);
const { capability } = getAuthorityRuntimeSnapshot(settings);
return Boolean(
authoritySettings.enabled &&
authoritySettings.blobCheckpointEnabled &&
capability.blobReady,
);
}
function getAuthorityBlobAdapter(options = {}) {
const settings = getSettings();
const config = normalizeAuthorityBlobConfig(settings);
return createAuthorityBlobAdapter(config, {
fetchImpl: globalThis.fetch?.bind(globalThis),
headerProvider:
typeof getRequestHeaders === "function" ? () => getRequestHeaders() : null,
...options,
});
}
async function writeAuthorityLukerCheckpointBlob(
checkpoint = null,
{ chatId = "", reason = "luker-checkpoint", signal = undefined } = {},
) {
if (!checkpoint || !shouldUseAuthorityBlobCheckpoint()) {
return {
ok: false,
reason: "authority-blob-unavailable",
};
}
const normalizedChatId = normalizeChatIdCandidate(chatId || checkpoint.chatId);
const safeChatId = buildAuthorityBlobSafeSlug(normalizedChatId);
const hash = buildAuthorityBlobFileHash(normalizedChatId || safeChatId);
const path = `user/files/ST-BME_luker_checkpoint_${safeChatId}-${hash}.json`;
try {
const adapter = getAuthorityBlobAdapter();
const result = await adapter.writeJson(path, checkpoint, {
signal,
metadata: {
chatId: normalizedChatId,
revision: Number(checkpoint?.revision || 0),
reason: String(reason || ""),
kind: "luker-checkpoint",
},
});
const event = {
action: "checkpoint-write",
ok: result?.ok !== false,
backend: "authority-blob",
path: result?.path || path,
reason: String(reason || ""),
revision: Number(checkpoint?.revision || 0),
};
recordAuthorityBlobSnapshot(event);
updateGraphPersistenceState({
authorityBlobCheckpointPath: event.path,
authorityBlobCheckpointRevision: event.revision,
authorityBlobCheckpointUpdatedAt: new Date().toISOString(),
});
return {
ok: event.ok,
path: event.path,
result,
};
} catch (error) {
const message = error?.message || String(error) || "Authority Blob checkpoint failed";
recordAuthorityBlobSnapshot({
action: "checkpoint-write",
ok: false,
backend: "authority-blob",
path,
reason: String(reason || ""),
error: message,
revision: Number(checkpoint?.revision || 0),
});
return {
ok: false,
path,
reason: "authority-blob-checkpoint-error",
error,
};
}
}
async function readAuthorityLukerCheckpointBlob(chatId = "", options = {}) {
if (!shouldUseAuthorityBlobCheckpoint()) {
return {
ok: false,
exists: false,
reason: "authority-blob-unavailable",
};
}
const normalizedChatId = normalizeChatIdCandidate(chatId);
if (!normalizedChatId) {
return {
ok: false,
exists: false,
reason: "missing-chat-id",
};
}
const safeChatId = buildAuthorityBlobSafeSlug(normalizedChatId);
const hash = buildAuthorityBlobFileHash(normalizedChatId || safeChatId);
const path = `user/files/ST-BME_luker_checkpoint_${safeChatId}-${hash}.json`;
try {
const adapter = getAuthorityBlobAdapter();
const result = await adapter.readJson(path, options);
const exists = Boolean(result?.exists && result?.payload);
recordAuthorityBlobSnapshot({
action: "checkpoint-read",
ok: result?.ok !== false,
backend: "authority-blob",
path: result?.path || path,
reason: exists ? "checkpoint-found" : "checkpoint-missing",
revision: Number(result?.payload?.revision || 0),
});
return {
ok: result?.ok !== false,
exists,
path: result?.path || path,
checkpoint: exists ? result.payload : null,
result,
};
} catch (error) {
const message = error?.message || String(error) || "Authority Blob checkpoint read failed";
recordAuthorityBlobSnapshot({
action: "checkpoint-read",
ok: false,
backend: "authority-blob",
path,
reason: "authority-blob-checkpoint-read-error",
error: message,
});
return {
ok: false,
exists: false,
path,
reason: "authority-blob-checkpoint-read-error",
error,
};
}
}
async function readLukerGraphSidecarV2WithAuthorityBlob(context = null, options = {}) {
const sidecar = await readLukerGraphSidecarV2(context, options);
if (sidecar?.checkpoint) return sidecar;
const chatId =
normalizeChatIdCandidate(options.chatId) ||
normalizeChatIdCandidate(sidecar?.manifest?.chatId) ||
normalizeChatIdCandidate(getCurrentChatId(context));
const blobResult = await readAuthorityLukerCheckpointBlob(chatId);
if (!blobResult?.exists || !blobResult?.checkpoint) return sidecar;
return {
...(sidecar || {}),
checkpoint: blobResult.checkpoint,
authorityBlobCheckpoint: {
path: blobResult.path,
backend: "authority-blob",
},
};
}
async function submitAuthorityVectorRebuildJob({
config = null,
range = null,
@@ -6448,6 +6653,9 @@ async function refreshRuntimeGraphAfterSyncApplied(syncPayload = {}) {
function buildBmeSyncRuntimeOptions(extra = {}) {
const normalizedExtra =
extra && typeof extra === "object" && !Array.isArray(extra) ? extra : {};
const settings = getSettings();
const authoritySettings = normalizeAuthoritySettings(settings);
const { capability } = getAuthorityRuntimeSnapshot(settings);
const defaultOptions = {
getDb: async (chatId) => {
const manager = ensureBmeChatManager();
@@ -6468,6 +6676,16 @@ function buildBmeSyncRuntimeOptions(extra = {}) {
getCurrentChatId: () => getCurrentChatId(),
getCloudStorageMode: () => getSettings().cloudStorageMode || "automatic",
getRequestHeaders,
authorityBlobEnabled: Boolean(
authoritySettings.enabled &&
authoritySettings.blobCheckpointEnabled &&
capability.blobReady,
),
authorityBlobFailOpen: authoritySettings.failOpen,
authorityBlobConfig: {
...authoritySettings,
},
onAuthorityBlobEvent: recordAuthorityBlobSnapshot,
onSyncApplied: async (payload = {}) => {
await refreshRuntimeGraphAfterSyncApplied(payload);
},
@@ -7436,6 +7654,10 @@ async function compactLukerGraphSidecarV2(
error: checkpointResult?.error || null,
};
}
await writeAuthorityLukerCheckpointBlob(checkpointResult.checkpoint, {
chatId: normalizedChatId,
reason,
});
const emptyJournal = buildLukerGraphJournalV2([], {
chatId: normalizedChatId,
@@ -7651,11 +7873,12 @@ async function persistGraphToLukerSidecarV2(
? cloneRuntimeDebugValue(persistDelta, persistDelta)
: null;
const existingSidecar = await readLukerGraphSidecarV2(context, {
const existingSidecar = await readLukerGraphSidecarV2WithAuthorityBlob(context, {
manifestNamespace: LUKER_GRAPH_MANIFEST_NAMESPACE,
journalNamespace: LUKER_GRAPH_JOURNAL_NAMESPACE,
checkpointNamespace: LUKER_GRAPH_CHECKPOINT_NAMESPACE,
chatStateTarget: normalizedTarget,
chatId,
});
if (existingSidecar?.manifest) {
cacheChatStateManifest(chatId, existingSidecar.manifest);
@@ -7742,6 +7965,10 @@ async function persistGraphToLukerSidecarV2(
error: checkpointResult?.error || null,
};
}
await writeAuthorityLukerCheckpointBlob(checkpointResult.checkpoint, {
chatId,
reason: `${reason}:bootstrap`,
});
const emptyJournal = buildLukerGraphJournalV2([], {
chatId,
integrity: nextIntegrity,
@@ -8071,11 +8298,12 @@ async function loadGraphFromLukerSidecarV2(
};
}
const sidecar = await readLukerGraphSidecarV2(context, {
const sidecar = await readLukerGraphSidecarV2WithAuthorityBlob(context, {
manifestNamespace: LUKER_GRAPH_MANIFEST_NAMESPACE,
journalNamespace: LUKER_GRAPH_JOURNAL_NAMESPACE,
checkpointNamespace: LUKER_GRAPH_CHECKPOINT_NAMESPACE,
chatStateTarget: normalizedTarget,
chatId: normalizedChatId,
});
const manifest = sidecar?.manifest || null;
if (!manifest) {
@@ -8784,8 +9012,9 @@ async function readPersistedGraphForChatStateTarget(
return null;
}
const sidecar = await readLukerGraphSidecarV2(context, {
const sidecar = await readLukerGraphSidecarV2WithAuthorityBlob(context, {
chatStateTarget: normalizedTarget,
chatId: targetChatId,
});
const sidecarResult = buildSnapshotFromLukerSidecarState(sidecar, {
chatId: targetChatId,