perf: optimize overview/tasks/polling/microservice-cache and improve frontend progressive loading

This commit is contained in:
Codex
2026-05-11 08:08:17 +00:00
parent 5a198baf77
commit 688376abc4
6 changed files with 521 additions and 144 deletions
+301 -49
View File
@@ -117,6 +117,7 @@ type TaskTerminalWaiter = (task: RawTaskRow | null) => void;
interface MicroserviceProxyCacheEntry {
expiresAt: number;
staleExpiresAt: number;
status: number;
contentType: string;
bodyText: string;
@@ -140,6 +141,7 @@ const operationPerformanceSamples: OperationPerformanceSample[] = [];
const maxPerformanceSamples = 3000;
const taskTerminalWaiters = new Map<string, Set<TaskTerminalWaiter>>();
const microserviceProxyCache = new Map<string, MicroserviceProxyCacheEntry>();
const microserviceProxyRefreshes = new Map<string, Promise<void>>();
let lastTaskSweepAt = 0;
let taskSweepInFlight: Promise<void> | null = null;
const microserviceProxyMaxBodyTextLength = 8 * 1024 * 1024;
@@ -1062,8 +1064,169 @@ async function getEvents(limit: number): Promise<ApiEvent[]> {
}));
}
async function getTasks(limit: number, statusFilter = "all", lite = false): Promise<ApiTask[]> {
function rowString(row: Record<string, unknown>, key: string): string {
const value = row[key];
return typeof value === "string" ? value : value === null || value === undefined ? "" : String(value);
}
function rowNumber(row: Record<string, unknown>, key: string): number | null {
const value = row[key];
const parsed = typeof value === "number" ? value : typeof value === "string" ? Number(value) : NaN;
return Number.isFinite(parsed) ? parsed : null;
}
function taskJsonSummary(row: Record<string, unknown>, prefix: "payload" | "result"): JsonValue {
const type = rowString(row, `${prefix}_type`);
if (type.length === 0) return prefix === "result" ? null : {};
const summary: Record<string, JsonValue> = { summaryOnly: true, type };
const fields = prefix === "payload"
? ["source", "serviceId", "method", "path", "mode", "targetBaseUrl", "timeoutMs", "targetProviderGatewayVersion", "providerGatewayVersion"]
: ["error", "reason", "message", "status", "exitCode", "code", "signal", "timeoutMs", "previousStatus", "mode", "policy", "targetProviderGatewayVersion", "providerGatewayVersion", "updaterContainerId"];
for (const field of fields) {
const value = row[`${prefix}_${field}`];
if (value !== null && value !== undefined && String(value).length > 0) {
summary[field] = typeof value === "number" || typeof value === "boolean" ? value : String(value);
}
}
const bodyChars = rowNumber(row, `${prefix}_body_text_chars`);
if (bodyChars !== null) summary.bodyText = `<omitted:${bodyChars} chars>`;
return summary;
}
function taskSummaryFromRow(row: Record<string, unknown>): ApiTask {
return {
id: String(row.id),
providerId: String(row.provider_id),
command: String(row.command),
status: String(row.status),
payload: taskJsonSummary(row, "payload"),
result: taskJsonSummary(row, "result"),
createdAt: row.created_at instanceof Date ? row.created_at.toISOString() : String(row.created_at),
updatedAt: row.updated_at instanceof Date ? row.updated_at.toISOString() : String(row.updated_at),
_summaryOnly: true,
} as ApiTask;
}
async function getTask(taskId: string): Promise<ApiTask | null> {
const rows = await sql<Array<Record<string, unknown>>>`
SELECT
id,
provider_id,
command,
status,
CASE
WHEN payload ? 'bodyText' THEN jsonb_set(payload - 'bodyText', '{bodyText}', to_jsonb(('<omitted:' || length(payload->>'bodyText')::text || ' chars>')::text))
ELSE payload
END AS payload,
CASE
WHEN result IS NOT NULL AND result ? 'bodyText' THEN jsonb_set(result - 'bodyText', '{bodyText}', to_jsonb(('<omitted:' || length(result->>'bodyText')::text || ' chars>')::text))
ELSE result
END AS result,
created_at,
updated_at
FROM unidesk_tasks
WHERE id = ${taskId}
LIMIT 1
`;
const row = rows[0];
if (row === undefined) return null;
return {
id: String(row.id),
providerId: String(row.provider_id),
command: String(row.command),
status: String(row.status),
payload: compactJson(row.payload ?? {}),
result: compactJson(row.result ?? null),
createdAt: row.created_at instanceof Date ? row.created_at.toISOString() : String(row.created_at),
updatedAt: row.updated_at instanceof Date ? row.updated_at.toISOString() : String(row.updated_at),
};
}
async function getTasks(limit: number, statusFilter = "all", lite = false, summary = false): Promise<ApiTask[]> {
await maybeMarkStaleTasksFailed();
if (summary && !lite) {
const rows = statusFilter === "pending"
? await sql<Array<Record<string, unknown>>>`
SELECT
id,
provider_id,
command,
status,
jsonb_typeof(payload) AS payload_type,
payload->>'source' AS "payload_source",
payload->>'serviceId' AS "payload_serviceId",
payload->>'method' AS "payload_method",
payload->>'path' AS "payload_path",
payload->>'mode' AS "payload_mode",
payload->>'targetBaseUrl' AS "payload_targetBaseUrl",
payload->>'timeoutMs' AS "payload_timeoutMs",
payload->>'targetProviderGatewayVersion' AS "payload_targetProviderGatewayVersion",
payload->>'providerGatewayVersion' AS "payload_providerGatewayVersion",
CASE WHEN payload ? 'bodyText' THEN length(payload->>'bodyText') ELSE NULL END AS payload_body_text_chars,
jsonb_typeof(result) AS result_type,
result->>'error' AS "result_error",
result->>'reason' AS "result_reason",
result->>'message' AS "result_message",
result->>'status' AS "result_status",
result->>'exitCode' AS "result_exitCode",
result->>'code' AS "result_code",
result->>'signal' AS "result_signal",
result->>'timeoutMs' AS "result_timeoutMs",
result->>'previousStatus' AS "result_previousStatus",
result->>'mode' AS "result_mode",
COALESCE(result->>'policy', result #>> '{plan,policy}') AS "result_policy",
COALESCE(result->>'targetProviderGatewayVersion', result #>> '{plan,targetProviderGatewayVersion}') AS "result_targetProviderGatewayVersion",
COALESCE(result->>'providerGatewayVersion', result #>> '{plan,providerGatewayVersion}') AS "result_providerGatewayVersion",
result->>'updaterContainerId' AS "result_updaterContainerId",
CASE WHEN result ? 'bodyText' THEN length(result->>'bodyText') ELSE NULL END AS result_body_text_chars,
created_at,
updated_at
FROM unidesk_tasks
WHERE status IN ('queued', 'dispatched', 'running')
ORDER BY updated_at DESC
LIMIT ${limit}
`
: await sql<Array<Record<string, unknown>>>`
SELECT
id,
provider_id,
command,
status,
jsonb_typeof(payload) AS payload_type,
payload->>'source' AS "payload_source",
payload->>'serviceId' AS "payload_serviceId",
payload->>'method' AS "payload_method",
payload->>'path' AS "payload_path",
payload->>'mode' AS "payload_mode",
payload->>'targetBaseUrl' AS "payload_targetBaseUrl",
payload->>'timeoutMs' AS "payload_timeoutMs",
payload->>'targetProviderGatewayVersion' AS "payload_targetProviderGatewayVersion",
payload->>'providerGatewayVersion' AS "payload_providerGatewayVersion",
CASE WHEN payload ? 'bodyText' THEN length(payload->>'bodyText') ELSE NULL END AS payload_body_text_chars,
jsonb_typeof(result) AS result_type,
result->>'error' AS "result_error",
result->>'reason' AS "result_reason",
result->>'message' AS "result_message",
result->>'status' AS "result_status",
result->>'exitCode' AS "result_exitCode",
result->>'code' AS "result_code",
result->>'signal' AS "result_signal",
result->>'timeoutMs' AS "result_timeoutMs",
result->>'previousStatus' AS "result_previousStatus",
result->>'mode' AS "result_mode",
COALESCE(result->>'policy', result #>> '{plan,policy}') AS "result_policy",
COALESCE(result->>'targetProviderGatewayVersion', result #>> '{plan,targetProviderGatewayVersion}') AS "result_targetProviderGatewayVersion",
COALESCE(result->>'providerGatewayVersion', result #>> '{plan,providerGatewayVersion}') AS "result_providerGatewayVersion",
result->>'updaterContainerId' AS "result_updaterContainerId",
CASE WHEN result ? 'bodyText' THEN length(result->>'bodyText') ELSE NULL END AS result_body_text_chars,
created_at,
updated_at
FROM unidesk_tasks
ORDER BY updated_at DESC
LIMIT ${limit}
`;
return rows.map(taskSummaryFromRow);
}
const rows = statusFilter === "pending"
? lite
? await sql<Array<Record<string, unknown>>>`
@@ -1161,22 +1324,39 @@ async function getPgdataUsage(): Promise<JsonValue> {
}
async function getOverview(): Promise<JsonValue> {
const nodes = await getNodes();
const pendingTasks = await countPendingTasks();
const dockerStatuses = await getNodeDockerStatuses();
const systemStatuses = await getNodeSystemStatuses(1);
const pgdata = await getPgdataUsage();
const online = nodes.filter((node) => node.status === "online").length;
const [nodeRows, dockerRows, systemRows, pendingTasks, pgdata] = await Promise.all([
sql<Array<{ node_count: string | number; online_node_count: string | number }>>`
SELECT
count(*)::int AS node_count,
count(*) FILTER (WHERE status = 'online')::int AS online_node_count
FROM unidesk_nodes
`,
sql<Array<{ docker_status_node_count: string | number }>>`
SELECT count(*) FILTER (WHERE d.status IS NOT NULL)::int AS docker_status_node_count
FROM unidesk_nodes n
LEFT JOIN unidesk_node_docker_status d ON d.provider_id = n.provider_id
`,
sql<Array<{ system_status_node_count: string | number }>>`
SELECT count(*) FILTER (WHERE s.status IS NOT NULL)::int AS system_status_node_count
FROM unidesk_nodes n
LEFT JOIN unidesk_node_system_status s ON s.provider_id = n.provider_id
`,
countPendingTasks(),
getPgdataUsage(),
]);
const nodeRow = nodeRows[0];
const dockerRow = dockerRows[0];
const systemRow = systemRows[0];
return {
service: "unidesk-core",
ok: true,
dbReady,
pgdata,
uptimeSeconds: Math.floor((Date.now() - serviceStartedAt.getTime()) / 1000),
nodeCount: nodes.length,
onlineNodeCount: online,
dockerStatusNodeCount: dockerStatuses.filter((item) => item.dockerStatus !== null).length,
systemStatusNodeCount: systemStatuses.filter((item) => item.current !== null).length,
nodeCount: Number(nodeRow?.node_count ?? 0),
onlineNodeCount: Number(nodeRow?.online_node_count ?? 0),
dockerStatusNodeCount: Number(dockerRow?.docker_status_node_count ?? 0),
systemStatusNodeCount: Number(systemRow?.system_status_node_count ?? 0),
pendingTaskCount: pendingTasks,
taskPendingTimeoutMs: config.taskPendingTimeoutMs,
activeSocketCount: activeProviders.size,
@@ -1432,12 +1612,27 @@ function responseFromMicroserviceResult(task: Awaited<ReturnType<typeof rawTask>
}
function microserviceCacheTtlMs(serviceId: string, targetPath: string): number {
if (serviceId === "pipeline" && targetPath === "/api/snapshot") return 6_000;
if (serviceId === "pipeline" && targetPath.startsWith("/api/oa-event-flow/")) return 20_000;
if (serviceId === "pipeline" && targetPath.startsWith("/api/model-quota/")) return 60_000;
if (serviceId === "pipeline" && targetPath.startsWith("/api/node-control/runs/")) return 6_000;
if (serviceId === "pipeline" && targetPath.startsWith("/api/runs/")) return 6_000;
if (serviceId === "findjob" && (targetPath === "/api/summary" || targetPath === "/api/jobs" || targetPath === "/api/drafts")) return 8_000;
if (serviceId === "met-nonlinear" && (targetPath === "/api/images" || targetPath === "/api/projects")) return 15_000;
if (serviceId === "met-nonlinear" && (targetPath === "/api/queue" || targetPath === "/api/summary" || targetPath === "/api/history")) return 1_500;
if (serviceId === "met-nonlinear" && (targetPath === "/api/queue" || targetPath === "/api/summary" || targetPath === "/api/history")) return 5_000;
if (serviceId === "codex-queue" && targetPath.includes("/transcript")) return 1_000;
return 750;
}
function microserviceCacheStaleMs(serviceId: string, targetPath: string): number {
if (serviceId === "pipeline" && targetPath.startsWith("/api/model-quota/")) return 5 * 60_000;
if (serviceId === "pipeline") return 45_000;
if (serviceId === "findjob") return 60_000;
if (serviceId === "met-nonlinear" && (targetPath === "/api/images" || targetPath === "/api/projects")) return 5 * 60_000;
if (serviceId === "met-nonlinear") return 45_000;
return 5_000;
}
function providerMicroserviceCacheTtlMs(serviceId: string, targetPath: string): number {
if (serviceId === "met-nonlinear" && (targetPath === "/api/images" || targetPath === "/api/projects")) return 60_000;
if (serviceId === "met-nonlinear" && targetPath === "/api/history") return 10_000;
@@ -1451,22 +1646,38 @@ function microserviceCacheKey(service: MicroserviceConfig, method: string, targe
return JSON.stringify([service.id, method, targetPath, proxyOptions.query, proxyOptions.jsonArrayLimits]);
}
function readMicroserviceCache(key: string): Response | null {
const entry = microserviceProxyCache.get(key);
if (entry === undefined) return null;
if (entry.expiresAt <= Date.now()) {
microserviceProxyCache.delete(key);
return null;
}
function responseFromMicroserviceCache(entry: MicroserviceProxyCacheEntry, state: "hit" | "stale"): Response {
return new Response(entry.bodyText, {
status: entry.status,
headers: {
"content-type": entry.contentType,
"x-unidesk-cache": "hit",
"x-unidesk-cache": state,
},
});
}
function readMicroserviceCache(key: string): Response | null {
const entry = microserviceProxyCache.get(key);
if (entry === undefined) return null;
if (entry.staleExpiresAt <= Date.now()) {
microserviceProxyCache.delete(key);
return null;
}
if (entry.expiresAt <= Date.now()) return null;
return responseFromMicroserviceCache(entry, "hit");
}
function readStaleMicroserviceCache(key: string): Response | null {
const entry = microserviceProxyCache.get(key);
if (entry === undefined) return null;
if (entry.staleExpiresAt <= Date.now()) {
microserviceProxyCache.delete(key);
return null;
}
if (entry.expiresAt > Date.now()) return null;
return responseFromMicroserviceCache(entry, "stale");
}
async function cacheableResponseSnapshot(response: Response): Promise<MicroserviceProxyCacheEntry | null> {
if (response.status < 200 || response.status >= 300) return null;
if (response.headers.get("x-unidesk-response-truncated") === "true") return null;
@@ -1474,6 +1685,7 @@ async function cacheableResponseSnapshot(response: Response): Promise<Microservi
if (bodyText.length > 2 * 1024 * 1024) return null;
return {
expiresAt: 0,
staleExpiresAt: 0,
status: response.status,
contentType: response.headers.get("content-type") ?? "application/octet-stream",
bodyText,
@@ -1482,7 +1694,9 @@ async function cacheableResponseSnapshot(response: Response): Promise<Microservi
function rememberMicroserviceCache(key: string, ttlMs: number, entry: MicroserviceProxyCacheEntry | null): void {
if (entry === null || ttlMs <= 0) return;
const keyParts = JSON.parse(key) as string[];
entry.expiresAt = Date.now() + ttlMs;
entry.staleExpiresAt = entry.expiresAt + microserviceCacheStaleMs(String(keyParts[0] ?? ""), String(keyParts[2] ?? ""));
microserviceProxyCache.set(key, entry);
if (microserviceProxyCache.size > 300) {
const now = Date.now();
@@ -1550,6 +1764,56 @@ async function directMicroserviceResponse(
}
}
async function fetchMicroserviceUpstreamResponse(
service: MicroserviceConfig,
method: string,
targetPath: string,
proxyOptions: { query: string; jsonArrayLimits: Record<string, JsonValue> },
requestHeaders: Record<string, JsonValue>,
bodyText: string,
): Promise<Response> {
if (canDirectProxyMicroservice(service)) {
return directMicroserviceResponse(service, method, targetPath, proxyOptions, requestHeaders, bodyText);
}
if (!(await providerSupports(service.providerId, "microservice.http"))) {
return jsonResponse({ ok: false, error: `provider does not declare microservice.http capability: ${service.providerId}` }, 409);
}
const { taskId, providerOnline } = await createAndSendTask(service.providerId, "microservice.http", {
source: "microservice-frontend-proxy",
serviceId: service.id,
method,
targetBaseUrl: service.backend.nodeBaseUrl,
path: targetPath,
query: proxyOptions.query,
requestHeaders,
bodyText,
jsonArrayLimits: proxyOptions.jsonArrayLimits,
timeoutMs: service.backend.timeoutMs,
cacheTtlMs: providerMicroserviceCacheTtlMs(service.id, targetPath),
});
if (!providerOnline) return jsonResponse({ ok: false, error: `provider is offline: ${service.providerId}`, taskId }, 503);
const task = await waitForTaskTerminal(taskId, service.backend.timeoutMs + 3000);
return responseFromMicroserviceResult(task);
}
function refreshMicroserviceCacheInBackground(
cacheKey: string,
ttlMs: number,
fetchResponse: () => Promise<Response>,
): void {
if (microserviceProxyRefreshes.has(cacheKey)) return;
const refresh = fetchResponse()
.then((response) => cacheableResponseSnapshot(response))
.then((entry) => rememberMicroserviceCache(cacheKey, ttlMs, entry))
.catch((error) => {
logger("warn", "microservice_cache_revalidate_failed", { cacheKey, error: errorToJson(error) });
})
.finally(() => {
microserviceProxyRefreshes.delete(cacheKey);
});
microserviceProxyRefreshes.set(cacheKey, refresh);
}
async function microserviceRoute(req: Request, url: URL): Promise<Response> {
const rest = url.pathname.slice("/api/microservices/".length);
const slashIndex = rest.indexOf("/");
@@ -1580,12 +1844,9 @@ async function microserviceRoute(req: Request, url: URL): Promise<Response> {
if (!isMicroservicePathAllowed(service, targetPath)) {
return jsonResponse({ ok: false, error: "microservice path is not allowed", serviceId, targetPath }, 403);
}
const directProxy = canDirectProxyMicroservice(service);
if (!directProxy && !(await providerSupports(service.providerId, "microservice.http"))) {
return jsonResponse({ ok: false, error: `provider does not declare microservice.http capability: ${service.providerId}` }, 409);
}
const proxyOptions = readMicroserviceArrayLimits(url);
const cacheKey = microserviceCacheKey(service, method, targetPath, proxyOptions);
const cacheTtlMs = microserviceCacheTtlMs(service.id, targetPath);
if (method === "GET" || method === "HEAD") {
const cached = readMicroserviceCache(cacheKey);
if (cached !== null) return cached;
@@ -1599,32 +1860,15 @@ async function microserviceRoute(req: Request, url: URL): Promise<Response> {
const requestHeaders: Record<string, JsonValue> = {};
const contentType = req.headers.get("content-type");
if (contentType !== null) requestHeaders["content-type"] = contentType.slice(0, 200);
if (directProxy) {
const response = await directMicroserviceResponse(service, method, targetPath, proxyOptions, requestHeaders, bodyText);
if (method === "GET" || method === "HEAD") {
rememberMicroserviceCache(cacheKey, microserviceCacheTtlMs(service.id, targetPath), await cacheableResponseSnapshot(response));
}
return response;
}
const { taskId, providerOnline } = await createAndSendTask(service.providerId, "microservice.http", {
source: "microservice-frontend-proxy",
serviceId: service.id,
method,
targetBaseUrl: service.backend.nodeBaseUrl,
path: targetPath,
query: proxyOptions.query,
requestHeaders,
bodyText,
jsonArrayLimits: proxyOptions.jsonArrayLimits,
timeoutMs: service.backend.timeoutMs,
cacheTtlMs: providerMicroserviceCacheTtlMs(service.id, targetPath),
});
if (!providerOnline) return jsonResponse({ ok: false, error: `provider is offline: ${service.providerId}`, taskId }, 503);
const task = await waitForTaskTerminal(taskId, service.backend.timeoutMs + 3000);
const response = responseFromMicroserviceResult(task);
if (method === "GET" || method === "HEAD") {
rememberMicroserviceCache(cacheKey, microserviceCacheTtlMs(service.id, targetPath), await cacheableResponseSnapshot(response));
const stale = readStaleMicroserviceCache(cacheKey);
if (stale !== null) {
refreshMicroserviceCacheInBackground(cacheKey, cacheTtlMs, () => fetchMicroserviceUpstreamResponse(service, method, targetPath, proxyOptions, requestHeaders, bodyText));
return stale;
}
}
const response = await fetchMicroserviceUpstreamResponse(service, method, targetPath, proxyOptions, requestHeaders, bodyText);
if (method === "GET" || method === "HEAD") rememberMicroserviceCache(cacheKey, cacheTtlMs, await cacheableResponseSnapshot(response));
return response;
}
@@ -1939,9 +2183,17 @@ async function routeInner(req: Request, server: Server<WsData>): Promise<Respons
if (url.pathname === "/api/nodes/system-status") return jsonResponse({ ok: true, systemStatuses: await withPerformanceOperation("core", "node_system_status", url.search, () => getNodeSystemStatuses(readLimit(url, 60))) });
if (url.pathname === "/api/nodes/docker-status") return jsonResponse({ ok: true, dockerStatuses: await withPerformanceOperation("core", "node_docker_status", url.pathname, () => getNodeDockerStatuses()) });
if (url.pathname === "/api/events") return jsonResponse({ ok: true, events: await withPerformanceOperation("core", "events", url.search, () => getEvents(readLimit(url, 100))) });
if (url.pathname.startsWith("/api/tasks/") && req.method === "GET") {
const taskId = decodeURIComponent(url.pathname.slice("/api/tasks/".length));
if (taskId.length === 0 || taskId.includes("/")) return jsonResponse({ ok: false, error: "invalid task id" }, 400);
const task = await withPerformanceOperation("core", "task_detail", taskId, () => getTask(taskId));
if (task === null) return jsonResponse({ ok: false, error: `task not found: ${taskId}` }, 404);
return jsonResponse({ ok: true, task });
}
if (url.pathname === "/api/tasks") {
const lite = ["1", "true", "yes"].includes((url.searchParams.get("lite") ?? "").toLowerCase());
return jsonResponse({ ok: true, tasks: await withPerformanceOperation("core", "tasks", url.search, () => getTasks(readLimit(url, 100), url.searchParams.get("status") ?? "all", lite)) });
const summary = ["1", "true", "yes"].includes((url.searchParams.get("summary") ?? "").toLowerCase());
return jsonResponse({ ok: true, tasks: await withPerformanceOperation("core", "tasks", url.search, () => getTasks(readLimit(url, 100), url.searchParams.get("status") ?? "all", lite, summary)) });
}
if (url.pathname === "/api/microservices") return jsonResponse({ ok: true, microservices: await withPerformanceOperation("core", "microservices", url.pathname, () => getMicroservices()) });
if (url.pathname === "/api/performance") return jsonResponse(await getPerformance());
File diff suppressed because one or more lines are too long
+73 -16
View File
@@ -49,6 +49,30 @@ const fastCodexQueueService = {
},
};
function isDocumentVisible(): boolean {
return typeof document === "undefined" || document.visibilityState !== "hidden";
}
function shellRefreshIntervalMs(moduleId: string, tabId: string): number {
if (moduleId === "ops" && tabId === "status") return 5_000;
if (moduleId === "nodes" && tabId === "monitor") return 5_000;
if (moduleId === "tasks" && (tabId === "dispatch" || tabId === "pending")) return 5_000;
if (moduleId === "nodes" || moduleId === "ops") return 10_000;
if (moduleId === "apps") return 15_000;
if (moduleId === "tasks") return 15_000;
return 30_000;
}
async function loadTaskRawData(task: any): Promise<any> {
if (!task?._summaryOnly || !task?.id) return task;
const result = await requestJson(`${cfg.apiBaseUrl}/tasks/${encodeURIComponent(String(task.id))}`);
return result?.task || task;
}
function taskRawButtonData(task: any): any {
return task?._summaryOnly ? { ...task, _loadRaw: () => loadTaskRawData(task) } : task;
}
function fmtDate(value: any): string {
if (!value) return "--";
const date = new Date(value);
@@ -262,7 +286,7 @@ function taskUpgradeSource(task: any): string {
function taskUpgradePolicy(task: any): string {
const result = taskResult(task);
const plan = result.plan && typeof result.plan === "object" && !Array.isArray(result.plan) ? result.plan as AnyRecord : {};
const policy = plan.policy;
const policy = result.policy ?? plan.policy;
return typeof policy === "string" && policy.length > 0 ? policy : "--";
}
@@ -349,12 +373,29 @@ function Panel({ title, eyebrow, actions, children, className }: AnyRecord) {
}
function RawButton({ title, data, onOpen, testId }: AnyRecord) {
const [loading, setLoading] = useState(false);
const loadData = data && typeof data === "object" && typeof data._loadRaw === "function" ? data._loadRaw : null;
async function open(): Promise<void> {
if (!loadData) {
onOpen(title, data);
return;
}
setLoading(true);
try {
onOpen(title, await loadData());
} catch (err) {
onOpen(title, { ok: false, error: errorMessage(err, "读取原始 JSON 失败"), fallback: data });
} finally {
setLoading(false);
}
}
return h("button", {
type: "button",
className: "ghost-btn",
"data-testid": testId,
onClick: () => onOpen(title, data),
}, "查看原始JSON");
disabled: loading,
onClick: () => void open(),
}, loading ? "读取中" : "查看原始JSON");
}
function RawDialog({ raw, onClose }: AnyRecord) {
@@ -1227,7 +1268,7 @@ function UpgradeRecordsTable({ records, onRaw, compact = false }: AnyRecord) {
h("td", null, h("span", { className: "version-chip" }, taskUpgradeVersion(task))),
h("td", null, h("span", { className: `upgrade-outcome ${String(task.status || "").toLowerCase()}` }, taskUpgradeOutcome(task))),
h("td", null, fmtDate(task.updatedAt)),
h("td", null, h(RawButton, { title: `Provider Upgrade Task ${task.id}`, data: task, onOpen: onRaw })),
h("td", null, h(RawButton, { title: `Provider Upgrade Task ${task.id}`, data: taskRawButtonData(task), onOpen: onRaw })),
))),
));
}
@@ -1622,7 +1663,7 @@ function TaskCompactRow({ task, onRaw }: AnyRecord) {
h(StatusBadge, { status: task.status }),
h("div", null, h("strong", null, task.command), h("code", null, task.id)),
h("span", null, isPendingTask(task) ? `已等待 ${fmtRelativeAge(task.updatedAt)}` : `耗时 ${fmtDuration(taskElapsedSeconds(task) ?? 0)}`),
h(RawButton, { title: `Task ${task.id}`, data: task, onOpen: onRaw }),
h(RawButton, { title: `Task ${task.id}`, data: taskRawButtonData(task), onOpen: onRaw }),
);
}
@@ -1676,7 +1717,7 @@ function TaskPendingPage({ tasks, onRaw }: AnyRecord) {
h("td", null, h("code", null, task.providerId)),
h("td", null, fmtRelativeAge(task.updatedAt)),
h("td", null, h(DataSummary, { data: task.payload })),
h("td", null, h(RawButton, { title: `Pending Task ${task.id}`, data: task, onOpen: onRaw })),
h("td", null, h(RawButton, { title: `Pending Task ${task.id}`, data: taskRawButtonData(task), onOpen: onRaw })),
))),
)),
),
@@ -1697,7 +1738,7 @@ function TaskHistoryPage({ tasks, onRaw }: AnyRecord) {
h("td", null, h(DataSummary, { data: task.payload })),
h("td", null, h(TaskDiagnosticCell, { task })),
h("td", null, fmtDate(task.updatedAt)),
h("td", null, h(RawButton, { title: `Task ${task.id}`, data: task, onOpen: onRaw })),
h("td", null, h(RawButton, { title: `Task ${task.id}`, data: taskRawButtonData(task), onOpen: onRaw })),
))),
)),
),
@@ -1712,7 +1753,7 @@ function TaskResultsPage({ tasks, onRaw }: AnyRecord) {
h("div", { className: "node-card-head" }, h("strong", null, task.command), h(StatusBadge, { status: task.status })),
h("code", null, task.id),
h(DataSummary, { data: task.result, empty: "无执行输出" }),
h(RawButton, { title: `Task Result ${task.id}`, data: task, onOpen: onRaw }),
h(RawButton, { title: `Task Result ${task.id}`, data: taskRawButtonData(task), onOpen: onRaw }),
))),
);
}
@@ -1802,6 +1843,7 @@ function Shell({ session, onLogout }: AnyRecord) {
const [clock, setClock] = useState(new Date());
const [raw, setRaw] = useState(null);
const [railCollapsed, setRailCollapsed] = useState(false);
const refreshInFlightRef = React.useRef(false);
const module = ROUTE_REGISTRY.moduleById[activeModule] || ROUTE_REGISTRY.modules[0];
const activeTab = activeTabs[activeModule] || DEFAULT_ACTIVE_TABS[activeModule] || module.tabs[0].id;
@@ -1824,28 +1866,30 @@ function Shell({ session, onLogout }: AnyRecord) {
}] : [];
async function refresh(): Promise<void> {
if (refreshInFlightRef.current) return;
refreshInFlightRef.current = true;
try {
const requests: Array<[string, Promise<any>]> = [];
const add = (key: string, path: string): void => {
requests.push([key, requestJson(path)]);
};
const isOverview = activeModule === "ops" && activeTab === "status";
const needsOverviewSummary = activeModule !== "apps";
const needsOverviewSummary = isOverview || (activeModule === "config" && activeTab === "topology");
const needsNodes = isOverview || activeModule === "nodes" || (activeModule === "tasks" && activeTab === "dispatch");
const needsMicroservices = activeModule === "apps" && activeTab !== "codex-queue";
if (needsOverviewSummary) add("overview", `${cfg.apiBaseUrl}/overview`);
if (needsNodes) add("nodes", `${cfg.apiBaseUrl}/nodes`);
if (activeModule === "nodes" && activeTab === "monitor") {
add("systemStatuses", `${cfg.apiBaseUrl}/nodes/system-status?limit=60`);
add("tasks", `${cfg.apiBaseUrl}/tasks?limit=120`);
add("tasks", `${cfg.apiBaseUrl}/tasks?limit=120&summary=1`);
} else if (activeModule === "nodes" && activeTab === "docker") {
add("dockerStatuses", `${cfg.apiBaseUrl}/nodes/docker-status`);
} else if (activeModule === "nodes" && activeTab === "gateway") {
add("tasks", `${cfg.apiBaseUrl}/tasks?limit=300`);
add("tasks", `${cfg.apiBaseUrl}/tasks?limit=300&summary=1`);
} else if (activeModule === "tasks" && activeTab === "pending") {
add("pendingTasks", `${cfg.apiBaseUrl}/tasks?status=pending&limit=100`);
add("pendingTasks", `${cfg.apiBaseUrl}/tasks?status=pending&limit=100&summary=1`);
} else if (activeModule === "tasks" && (activeTab === "history" || activeTab === "results")) {
add("tasks", `${cfg.apiBaseUrl}/tasks?limit=300`);
add("tasks", `${cfg.apiBaseUrl}/tasks?limit=300&summary=1`);
} else if (isOverview) {
add("tasks", `${cfg.apiBaseUrl}/tasks?limit=8&lite=1`);
add("pendingTasks", `${cfg.apiBaseUrl}/tasks?status=pending&limit=20&lite=1`);
@@ -1873,13 +1917,26 @@ function Shell({ session, onLogout }: AnyRecord) {
} catch (err) {
setConnection({ ok: false, text: errorMessage(err, "连接失败") });
if ((err as { status?: number }).status === 401) onLogout(false);
} finally {
refreshInFlightRef.current = false;
}
}
useEffect(() => {
refresh();
const timer = setInterval(refresh, 5000);
return () => clearInterval(timer);
const tick = (): void => {
if (!isDocumentVisible()) return;
void refresh();
};
tick();
const timer = setInterval(tick, shellRefreshIntervalMs(activeModule, activeTab));
const onVisible = (): void => {
if (isDocumentVisible()) tick();
};
document.addEventListener("visibilitychange", onVisible);
return () => {
clearInterval(timer);
document.removeEventListener("visibilitychange", onVisible);
};
}, [activeModule, activeTab]);
useEffect(() => {
+49 -14
View File
@@ -11,6 +11,16 @@ const useState: any = React.useState;
const codexTranscriptChunkLimit = 120;
const codexInitialTaskLimit = 24;
const codexMoreTaskLimit = 48;
const codexLocalReadRetention = 300;
const queueErrorPreviewLength = 1200;
function isDocumentVisible(): boolean {
return typeof document === "undefined" || document.visibilityState !== "hidden";
}
function errorText(error: unknown, fallback = "操作失败"): string {
return errorMessage(error, fallback);
}
function fmtDate(value: any): string {
if (!value) return "--";
@@ -53,6 +63,7 @@ async function requestJson(path: string, options: AnyRecord = {}): Promise<any>
retryInvalidJson: 1,
invalidJsonPrefix: "Codex Queue 返回了无效 JSON",
invalidJsonPreview: true,
responsePreviewLength: queueErrorPreviewLength,
...options,
});
}
@@ -678,19 +689,33 @@ function loadLocalReadAt(): AnyRecord {
if (typeof window === "undefined") return {};
try {
const parsed = JSON.parse(window.localStorage.getItem(codexReadAtStorageKey) || "{}");
return parsed && typeof parsed === "object" && !Array.isArray(parsed) ? parsed : {};
return parsed && typeof parsed === "object" && !Array.isArray(parsed) ? trimLocalReadAt(parsed) : {};
} catch {
return {};
}
}
function saveLocalReadAt(readAtByTask: AnyRecord): void {
if (typeof window === "undefined") return;
function trimLocalReadAt(readAtByTask: AnyRecord): AnyRecord {
const entries = Object.entries(readAtByTask || {})
.filter(([, value]) => typeof value === "string" && value.length > 0)
.sort((left, right) => {
const leftTime = Date.parse(String(left[1] || ""));
const rightTime = Date.parse(String(right[1] || ""));
return (Number.isFinite(rightTime) ? rightTime : 0) - (Number.isFinite(leftTime) ? leftTime : 0);
})
.slice(0, codexLocalReadRetention);
return Object.fromEntries(entries);
}
function saveLocalReadAt(readAtByTask: AnyRecord): AnyRecord {
const retained = trimLocalReadAt(readAtByTask);
if (typeof window === "undefined") return retained;
try {
window.localStorage.setItem(codexReadAtStorageKey, JSON.stringify(readAtByTask));
window.localStorage.setItem(codexReadAtStorageKey, JSON.stringify(retained));
} catch {
// Best-effort fallback only; backend readAt remains authoritative when deployed.
}
return retained;
}
function applyLocalReadState(task: any, readAtByTask: AnyRecord): any {
@@ -1459,8 +1484,7 @@ export function CodexQueuePage({ microservices, onRaw, apiBaseUrl = "/api", init
setLocalReadAt((previous: AnyRecord) => {
const next = { ...(previous || {}) };
for (const id of ids) next[id] = readAt;
saveLocalReadAt(next);
return next;
return saveLocalReadAt(next);
});
}
@@ -1854,7 +1878,7 @@ export function CodexQueuePage({ microservices, onRaw, apiBaseUrl = "/api", init
startedAt: new Date(Date.now() - queueMs),
});
}
if (nextId) void ensureTraceSummary(nextId, true, trackLoad ? startedAt : undefined, trackLoad ? queueMs : undefined).catch((err) => setError(errorMessage(err, "加载 Codex Trace Summary 失败")));
if (nextId) void ensureTraceSummary(nextId, true, trackLoad ? startedAt : undefined, trackLoad ? queueMs : undefined).catch((err) => setError(errorText(err, "加载 Codex Trace Summary 失败")));
else {
detailLoadTokenRef.current += 1;
setSelectedTask(null);
@@ -1902,7 +1926,7 @@ export function CodexQueuePage({ microservices, onRaw, apiBaseUrl = "/api", init
};
});
} catch (err) {
setError(errorMessage(err, "加载更早 Codex tasks 失败"));
setError(errorText(err, "加载更早 Codex tasks 失败"));
} finally {
loadMoreInFlightRef.current = false;
setLoadingMoreTasks(false);
@@ -1922,7 +1946,7 @@ export function CodexQueuePage({ microservices, onRaw, apiBaseUrl = "/api", init
try {
await action();
} catch (err) {
setError(errorMessage(err, message));
setError(errorText(err, message));
} finally {
setBusy(false);
}
@@ -1955,7 +1979,7 @@ export function CodexQueuePage({ microservices, onRaw, apiBaseUrl = "/api", init
setNotice(`已复制任务 ID${taskId}`);
window.setTimeout(() => setCopiedTaskId((value: string) => value === taskId ? "" : value), 1600);
} catch (err) {
setError(`复制任务 ID 失败:${errorMessage(err)}`);
setError(`复制任务 ID 失败:${errorText(err)}`);
}
}
@@ -2177,7 +2201,7 @@ export function CodexQueuePage({ microservices, onRaw, apiBaseUrl = "/api", init
if (row) setSelectedTask(row);
else setSelectedTask(null);
}
void load(taskId).catch((err) => setError(errorMessage(err, "切换 Codex session 失败")));
void load(taskId).catch((err) => setError(errorText(err, "切换 Codex session 失败")));
}
function selectTaskFromSidebar(taskId: string): void {
@@ -2195,10 +2219,21 @@ export function CodexQueuePage({ microservices, onRaw, apiBaseUrl = "/api", init
useEffect(() => {
if (!service) return undefined;
const tick = (): void => {
if (!isDocumentVisible()) return;
void load(selectedIdRef.current, false).catch((err) => setError(errorText(err, "Codex Queue 轮询失败")));
};
const timer = window.setInterval(() => {
void load(selectedIdRef.current, false).catch((err) => setError(errorMessage(err, "Codex Queue 轮询失败")));
tick();
}, 1500);
return () => window.clearInterval(timer);
const onVisible = (): void => {
if (isDocumentVisible()) tick();
};
document.addEventListener("visibilitychange", onVisible);
return () => {
window.clearInterval(timer);
document.removeEventListener("visibilitychange", onVisible);
};
}, [service?.id, selectedQueueId]);
useEffect(() => {
@@ -2211,7 +2246,7 @@ export function CodexQueuePage({ microservices, onRaw, apiBaseUrl = "/api", init
const key = `${taskId}:${updatedAt || "unknown"}`;
if (autoTraceLoadKeysRef.current.has(key)) return;
autoTraceLoadKeysRef.current.add(key);
void ensureTraceSummary(taskId, true).catch((err) => setError(errorMessage(err, "自动加载 Trace Summary 失败")));
void ensureTraceSummary(taskId, true).catch((err) => setError(errorText(err, "自动加载 Trace Summary 失败")));
}, [service?.id, selectedTask?.id, selectedTask?.updatedAt, selectedTask?._traceSummaryUpdatedAt, selectedTask?._traceSummaryLoaded, selectedDetailLoading]);
const taskListContent = tasks.length === 0 ? h(EmptyState, { title: "队列为空", text: "提交一个任务后,Codex 会串行执行并保存输出。" }) : [
+12 -3
View File
@@ -57,6 +57,7 @@ const clientConfig = JSON.stringify({
const indexHtmlTemplate = readFileSync(join(publicDir, "index.html"), "utf8");
const indexHtmlRootMarker = '<div id="root" data-config="__UNIDESK_CONFIG__"></div>';
const codexQueueOverviewCache = new Map<string, { at: number; payload: JsonValue; text: string }>();
const codexQueueOverviewRefreshes = new Map<string, Promise<JsonValue | null>>();
const codexQueueOverviewCacheTtlMs = 10_000;
const defaultCodexQueueOverviewPath = "/api/tasks/overview?limit=24&transcriptLimit=3&compact=1&afterSeq=0&preferId=";
@@ -71,6 +72,17 @@ function cachedCodexQueueOverview(pathWithQuery: string, maxAgeMs = codexQueueOv
}
async function refreshCodexQueueOverview(pathWithQuery: string, timeoutMs = 800): Promise<JsonValue | null> {
const existing = codexQueueOverviewRefreshes.get(pathWithQuery);
if (existing !== undefined) return existing;
const refresh = refreshCodexQueueOverviewUncached(pathWithQuery, timeoutMs)
.finally(() => {
codexQueueOverviewRefreshes.delete(pathWithQuery);
});
codexQueueOverviewRefreshes.set(pathWithQuery, refresh);
return refresh;
}
async function refreshCodexQueueOverviewUncached(pathWithQuery: string, timeoutMs = 800): Promise<JsonValue | null> {
const started = performance.now();
try {
const response = await fetch(`http://codex-queue:4222${pathWithQuery}`, {
@@ -123,9 +135,6 @@ async function spaShellHtml(req: Request, pathname: string): Promise<string> {
}
refreshCodexQueueOverview(defaultCodexQueueOverviewPath, 2_000).catch(() => undefined);
setInterval(() => {
refreshCodexQueueOverview(defaultCodexQueueOverviewPath, 2_000).catch(() => undefined);
}, 5_000);
const requestPerformanceSamples: RequestPerformanceSample[] = [];
const operationPerformanceSamples: OperationPerformanceSample[] = [];
const maxPerformanceSamples = 3000;
+33 -9
View File
@@ -35,6 +35,10 @@ const pipelineGanttNodeColumnWidth = 72;
const pipelineGanttHeaderHeight = 64;
const pipelineGanttArrowTipInsetPx = 12;
function isDocumentVisible(): boolean {
return typeof document === "undefined" || document.visibilityState !== "hidden";
}
function pipelinePercent(value: any, fallback: number): number {
const number = Number.parseFloat(String(value || ""));
return Number.isFinite(number) ? number / 100 : fallback;
@@ -3448,12 +3452,12 @@ export function PipelinePage({ microservices, onRaw, apiBaseUrl = "/api" }: AnyR
loadRequestRef.current = requestId;
if (!silent) setState((prev: any) => ({ ...prev, loading: true, error: "" }));
try {
const snapshotQuery = `__unideskArrayLimit=registry.components:80,runs:${pipelineSnapshotRunLimit}&_=${Date.now()}`;
const snapshotQuery = `__unideskArrayLimit=registry.components:80,runs:${pipelineSnapshotRunLimit}`;
const [snapshot, oaDiagnostics, minimaxQuota] = await Promise.all([
requestJson(`${apiBaseUrl}/microservices/pipeline/proxy/api/snapshot?${snapshotQuery}`, { cache: "no-store" }),
requestJson(`${apiBaseUrl}/microservices/pipeline/proxy/api/oa-event-flow/diagnostics?_=${Date.now()}`, { cache: "no-store" })
requestJson(`${apiBaseUrl}/microservices/pipeline/proxy/api/oa-event-flow/diagnostics`, { cache: "no-store" })
.catch((error: unknown) => ({ ok: false, error: errorMessage(error, "OA event flow diagnostics failed") })),
requestJson(`${apiBaseUrl}/microservices/pipeline/proxy/api/model-quota/minimax?_=${Date.now()}`, { cache: "no-store" })
requestJson(`${apiBaseUrl}/microservices/pipeline/proxy/api/model-quota/minimax`, { cache: "no-store" })
.catch((error: unknown) => ({ ok: false, error: errorMessage(error, "MiniMax quota failed") })),
]);
if (requestId !== loadRequestRef.current) return;
@@ -3470,10 +3474,20 @@ export function PipelinePage({ microservices, onRaw, apiBaseUrl = "/api" }: AnyR
useEffect(() => {
load();
if (!service) return undefined;
const tick = (): void => {
if (isDocumentVisible()) load({ silent: true });
};
const timer = window.setInterval(() => {
load({ silent: true });
tick();
}, pipelineAutoRefreshMs);
return () => window.clearInterval(timer);
const onVisible = (): void => {
if (isDocumentVisible()) tick();
};
document.addEventListener("visibilitychange", onVisible);
return () => {
window.clearInterval(timer);
document.removeEventListener("visibilitychange", onVisible);
};
}, [service?.id, service?.runtime?.providerStatus, apiBaseUrl]);
const runtime = microserviceRuntime(service);
@@ -3587,8 +3601,8 @@ export function PipelinePage({ microservices, onRaw, apiBaseUrl = "/api" }: AnyR
}));
try {
const [details, runSummary] = await Promise.all([
requestJson(`${pipelineProxyPath(apiBaseUrl, `/api/node-control/runs/${encodeURIComponent(runId)}?tail=160&view=timeline`)}&_=${Date.now()}`, { cache: "no-store", strictJson: true }),
requestJson(`${pipelineProxyPath(apiBaseUrl, `/api/runs/${encodeURIComponent(runId)}`)}?_=${Date.now()}`, { cache: "no-store" }).catch((error: unknown) => ({ ok: false, runSummaryError: errorMessage(error, "抓取评分失败") })),
requestJson(pipelineProxyPath(apiBaseUrl, `/api/node-control/runs/${encodeURIComponent(runId)}?tail=160&view=timeline`), { cache: "no-store", strictJson: true }),
requestJson(pipelineProxyPath(apiBaseUrl, `/api/runs/${encodeURIComponent(runId)}`), { cache: "no-store" }).catch((error: unknown) => ({ ok: false, runSummaryError: errorMessage(error, "抓取评分失败") })),
]);
if (requestId !== runDetailsRequestRef.current) return;
setRunDetails({ runId, scale, loading: false, error: "", details: { ...details, run: isRecord(runSummary?.run) ? runSummary.run : undefined, runSummaryError: runSummary?.runSummaryError }, fetchedAt: new Date() });
@@ -3653,10 +3667,20 @@ export function PipelinePage({ microservices, onRaw, apiBaseUrl = "/api" }: AnyR
return undefined;
}
void fetchRunDetails(activeRunId);
const tick = (): void => {
if (isDocumentVisible()) void fetchRunDetails(activeRunId, { silent: true });
};
const timer = window.setInterval(() => {
void fetchRunDetails(activeRunId, { silent: true });
tick();
}, pipelineAutoRefreshMs);
return () => window.clearInterval(timer);
const onVisible = (): void => {
if (isDocumentVisible()) tick();
};
document.addEventListener("visibilitychange", onVisible);
return () => {
window.clearInterval(timer);
document.removeEventListener("visibilitychange", onVisible);
};
}, [activeRunId, apiBaseUrl]);
async function fetchNodeDetails(runId = activeRunId, nodeId = selectedNodeId): Promise<void> {