feat(authority): harden jobs repair and diagnostics

This commit is contained in:
Youzini-afk
2026-04-28 19:34:28 +08:00
parent 080813ae46
commit 8979b89646
12 changed files with 1403 additions and 13 deletions

View File

@@ -63,6 +63,56 @@ function normalizeJobStatus(value = "queued") {
return String(value || "queued").trim().toLowerCase() || "queued";
}
function hasAsyncIterator(value = null) {
return !!value && typeof value[Symbol.asyncIterator] === "function";
}
function hasIterator(value = null) {
return !!value && typeof value[Symbol.iterator] === "function";
}
function readJobStreamPayload(event = null) {
const source = event && typeof event === "object" && !Array.isArray(event) ? event : null;
if (!source) return event;
if (source.job && typeof source.job === "object" && !Array.isArray(source.job)) {
return source.job;
}
if (source.result && typeof source.result === "object" && !Array.isArray(source.result)) {
return source.result;
}
if (source.payload && typeof source.payload === "object" && !Array.isArray(source.payload)) {
return source.payload;
}
if (source.data && typeof source.data === "object" && !Array.isArray(source.data)) {
return source.data;
}
return source;
}
async function* normalizeAuthorityJobStream(source = null, options = {}) {
if (hasAsyncIterator(source)) {
for await (const event of source) {
throwIfAborted(options.signal);
const job = normalizeAuthorityJobRecord(readJobStreamPayload(event));
if (job.id || job.status || job.error) {
yield job;
}
}
return;
}
if (hasIterator(source)) {
for (const event of source) {
throwIfAborted(options.signal);
const job = normalizeAuthorityJobRecord(readJobStreamPayload(event));
if (job.id || job.status || job.error) {
yield job;
}
}
return;
}
throw new Error("Authority Jobs stream unavailable");
}
function readJobRows(payload = null) {
if (Array.isArray(payload)) return payload;
if (!payload || typeof payload !== "object") return [];
@@ -192,6 +242,7 @@ export function normalizeAuthorityJobConfig(settings = {}, overrides = {}) {
baseUrl: normalizeAuthorityBaseUrl(source.authorityBaseUrl ?? source.baseUrl),
enabled: source.authorityJobsEnabled !== false && source.jobsEnabled !== false,
failOpen: source.authorityFailOpen !== false && source.failOpen !== false,
preferStream: source.authorityJobPreferStream !== false && source.jobStreamPreferred !== false,
pollIntervalMs: normalizeInteger(source.authorityJobPollIntervalMs ?? source.pollIntervalMs, 1200, 250, 30000),
waitTimeoutMs: normalizeInteger(source.authorityJobWaitTimeoutMs ?? source.waitTimeoutMs, 0, 0, 3600000),
...overrides,
@@ -271,6 +322,15 @@ async function callClient(client, methodNames = [], action = "request", payload
throw new Error(`Authority Jobs ${action} unavailable`);
}
async function callStreamClient(client, methodNames = [], payload = {}) {
for (const methodName of methodNames) {
if (typeof client?.[methodName] === "function") {
return await client[methodName](payload);
}
}
throw new Error("Authority Jobs stream unavailable");
}
function throwIfAborted(signal) {
if (signal?.aborted) {
throw signal.reason instanceof Error
@@ -331,6 +391,24 @@ export class AuthorityJobAdapter {
return normalizeAuthorityJobRecord(result?.job || result?.result || result);
}
async stream(jobId, options = {}) {
throwIfAborted(options.signal);
const id = normalizeRecordId(jobId);
if (!id) {
return normalizeAuthorityJobStream([], options);
}
const source = await callStreamClient(
this.client,
["stream", "streamJob", "watch", "watchJob"],
{
jobId: id,
id,
signal: options.signal,
},
);
return normalizeAuthorityJobStream(source, options);
}
async waitForCompletion(jobId, options = {}) {
throwIfAborted(options.signal);
const id = normalizeRecordId(jobId);