fix: compact database task payload storage

This commit is contained in:
Codex
2026-05-17 04:43:43 +00:00
parent eaa9c50f88
commit 68a14cdd4e
7 changed files with 88 additions and 15 deletions
+1 -1
View File
@@ -39,7 +39,7 @@
"password": "unidesk_dev_password",
"name": "unidesk",
"volume": "unidesk_pgdata_10gb",
"volumeSize": "10GB"
"volumeSize": "15GB"
},
"providerGateway": {
"id": "main-server",
+2 -2
View File
@@ -4,7 +4,7 @@
## Services
- `database` 使用 `postgres:16-alpine`,数据保存到 named volume `unidesk_pgdata_10gb`,初始化 SQL 位于 `src/components/database/init/`
- `database` 使用 `postgres:16-alpine`,数据保存到 named volume `unidesk_pgdata_10gb`当前容量预算为 15 GB初始化 SQL 位于 `src/components/database/init/`
- `backend-core` 是无状态核心服务,提供 Docker 内网 REST API、provider ingress WebSocket、任务调度入口和数据库访问层。源码拆分为 15 个职责单一的模块(`index.ts` 只做路由和启动),模块结构见 `docs/reference/repo-tree.md`
- `frontend` 是唯一公开 Web 控制台,提供登录、从 TSX 转译出的 React 应用资产和到 backend-core 的同源代理。
- `provider-gateway` 是当前主 server 的本机计算节点代理,通过 WebSocket 主动连到 provider ingress,挂载 `/var/run/docker.sock` 作为自动任务执行主路径,使用 `pid: "host"` 读取节点级进程资源,并周期性上报系统资源指标、进程占用与 Docker daemon 状态;计算节点 provider-gateway 还必须把 egress proxy 仅发布到宿主 loopback `127.0.0.1:18789` 供本节点执行环境出网,维护用 Host SSH / WSL SSH 私钥目录只读挂载到 `/run/host-ssh`,不得作为自动任务调度主路径。
@@ -66,6 +66,6 @@ Code Queue 已从主 server 迁移到 D601 k3s/k8s,但仍必须保持明确的
## Database Volume
架构要求数据库使用 10 GB named volume;当前实现将 volume 命名为 `unidesk_pgdata_10gb` 以固定生命周期。Docker named volume 默认不强制容量上限;如需硬配额,应在主机存储层或 Docker volume driver 层配置。CLI server 控制只能使用不删除 volume 的 `down` / `up` 流程,禁止使用 `down -v``docker volume rm` 或删除 `unidesk_pgdata_10gb`
架构要求数据库使用 15 GB 容量预算的 named volume;当前实现将 volume 命名为 `unidesk_pgdata_10gb` 以固定生命周期并避免创建空新卷。Docker named volume 默认不强制容量上限;如需硬配额,应在主机存储层或 Docker volume driver 层配置。CLI server 控制只能使用不删除 volume 的 `down` / `up` 流程,禁止使用 `down -v``docker volume rm` 或删除 `unidesk_pgdata_10gb`
`/api/overview` 会返回 `pgdata` 字段,frontend `态势总览 / 核心指标` 必须展示当前 PostgreSQL 数据库占用、命名卷名称和配置容量。Docker 状态页中 `unidesk_pgdata_10gb` 的命名卷检测只对 `main-server` Provider 生效,其他计算节点不需要也不应被要求存在该数据库卷。
+3 -1
View File
@@ -2,6 +2,7 @@ import type { JsonValue, ApiEvent, ApiNode, ApiNodeDockerStatus, ApiNodeSystemSt
import { ctx, sql, config, logger } from "./context";
import type { RawTaskRow, ScheduledTaskRow, ScheduledTaskRunRow, SqlClient } from "./types";
import { compactJson, nestedNumber, redactDatabaseUrl, rowIso } from "./http";
import { compactJsonForStorage } from "./json-storage";
function errorToJson(error: unknown): JsonValue {
if (error instanceof Error) {
@@ -118,10 +119,11 @@ export async function initDatabase(client: SqlClient): Promise<void> {
export async function recordEvent(type: string, source: string, payload: Record<string, JsonValue>): Promise<void> {
logger("info", type, { source, payload });
if (!ctx.dbReady) return;
const storedPayload = compactJsonForStorage(payload);
try {
await sql()`
INSERT INTO unidesk_events (type, source, payload)
VALUES (${type}, ${source}, ${sql().json(payload)})
VALUES (${type}, ${source}, ${sql().json(storedPayload)})
`;
} catch (error) {
logger("error", "event_insert_failed", { type, source, error: errorToJson(error) });
@@ -0,0 +1,60 @@
import { createHash } from "node:crypto";
import type { JsonValue } from "../../shared/src/index";
const defaultStringLimit = 8_000;
const bodyInlineLimit = 16_384;
const bodyPreviewLimit = 2_000;
const maxArrayItems = 100;
const maxObjectKeys = 100;
const maxDepth = 8;
function hashText(value: string): string {
return createHash("sha256").update(value).digest("hex");
}
function boundedString(value: string, limit: number): JsonValue {
if (value.length <= limit) return value;
return {
omitted: true,
omittedReason: "string too large for database inline JSON storage",
chars: value.length,
preview: value.slice(0, limit),
sha256: hashText(value),
};
}
function compactBodyText(value: string): JsonValue {
if (value.length <= bodyInlineLimit) return value;
return {
omitted: true,
omittedReason: "microservice HTTP body is stored as metadata only",
bodyPreview: value.slice(0, bodyPreviewLimit),
bodyTextChars: value.length,
bodyTextSha256: hashText(value),
};
}
export function compactJsonForStorage(value: unknown, depth = 0, key = ""): JsonValue {
if (value === null || typeof value === "number" || typeof value === "boolean") return value;
if (typeof value === "string") return key === "bodyText" ? compactBodyText(value) : boundedString(value, defaultStringLimit);
if (Array.isArray(value)) {
const items = value.slice(0, maxArrayItems).map((item) => compactJsonForStorage(item, depth + 1));
if (value.length > maxArrayItems) items.push({ truncatedItems: value.length - maxArrayItems });
return items;
}
if (typeof value === "object" && value !== null) {
if (depth >= maxDepth) return "<truncated:depth>";
const result: Record<string, JsonValue> = {};
const entries = Object.entries(value as Record<string, unknown>);
for (const [childKey, item] of entries.slice(0, maxObjectKeys)) {
if (childKey === "bodyText" && typeof item === "string") {
Object.assign(result, compactBodyText(item));
continue;
}
result[childKey] = compactJsonForStorage(item, depth + 1, childKey);
}
if (entries.length > maxObjectKeys) result.truncatedKeys = entries.length - maxObjectKeys;
return result;
}
return String(value);
}
@@ -10,6 +10,7 @@ import { recordEvent, upsertProviderNode, updateProviderHeartbeat, upsertDockerS
import { notifyTaskTerminal } from "./task-dispatcher";
import { forwardSshProviderMessage } from "./ssh-bridge";
import { handleEgressTcpOpen, handleEgressTcpData, handleEgressTcpClose } from "./egress-tcp";
import { compactJsonForStorage } from "./json-storage";
function isTerminalTaskStatus(status: string): boolean {
return status === "succeeded" || status === "failed";
@@ -147,9 +148,10 @@ export async function handleProviderMessage(ws: ProviderSocket, raw: string | Bu
return;
}
const storedResult = compactJsonForStorage(message.result ?? { message: message.message });
await sql()`
WITH incoming AS (
SELECT ${message.status}::text AS status, ${sql().json(message.result ?? { message: message.message })}::jsonb AS result
SELECT ${message.status}::text AS status, ${sql().json(storedResult)}::jsonb AS result
)
UPDATE unidesk_tasks
SET
@@ -172,7 +174,7 @@ export async function handleProviderMessage(ws: ProviderSocket, raw: string | Bu
taskId: message.taskId,
status: message.status,
message: message.message,
result: message.result ?? null,
result: storedResult,
});
if (isTerminalTaskStatus(message.status)) {
await notifyTaskTerminal(message.taskId);
+17 -8
View File
@@ -289,16 +289,23 @@ function jobRecord(value: unknown): Record<string, unknown> {
async function waitForBaiduTransfer(baseUrl: string, jobId: string, timeoutMs: number): Promise<Record<string, unknown>> {
const deadline = Date.now() + timeoutMs;
let latest: Record<string, unknown> = {};
let latestError = "";
while (Date.now() < deadline) {
const detail = await baiduJson(baseUrl, `/api/transfers/${encodeURIComponent(jobId)}`, {}, 30_000);
latest = detail;
const job = jobRecord(detail.job);
const status = String(job.status || "");
if (status === "succeeded") return detail;
if (status === "failed" || status === "canceled") throw new Error(`Baidu transfer ${status}: ${String(job.error || "no error detail")}`);
try {
const detail = await baiduJson(baseUrl, `/api/transfers/${encodeURIComponent(jobId)}`, {}, 30_000);
latest = detail;
latestError = "";
const job = jobRecord(detail.job);
const status = String(job.status || "");
if (status === "succeeded") return detail;
if (status === "failed" || status === "canceled") throw new Error(`Baidu transfer ${status}: ${String(job.error || "no error detail")}`);
} catch (error) {
latestError = error instanceof Error ? error.message : String(error);
logger("warn", "baidu_transfer_poll_failed", { jobId, error: latestError });
}
await Bun.sleep(2000);
}
throw new Error(`Baidu transfer timed out after ${timeoutMs}ms: ${JSON.stringify(compactJson(latest))}`);
throw new Error(`Baidu transfer timed out after ${timeoutMs}ms: ${JSON.stringify(compactJson({ latest, latestError }))}`);
}
/* ─── action execution ────────────────────────────────────────────────── */
@@ -343,6 +350,7 @@ async function executePgdataBackupAction(action: PgdataBackupScheduleAction, run
await rm(tempDir, { recursive: true, force: true });
await mkdir(tempDir, { recursive: true });
let uploadDetail: Record<string, unknown> | null = null;
let uploadJobId = "";
let backupBytes = 0;
let backupSha256 = "";
try {
@@ -378,6 +386,7 @@ async function executePgdataBackupAction(action: PgdataBackupScheduleAction, run
}, 60_000);
const jobId = String(jobRecord(created.job).id || "");
if (jobId.length === 0) throw new Error(`Baidu upload did not return a job id: ${JSON.stringify(compactJson(created))}`);
uploadJobId = jobId;
uploadDetail = await waitForBaiduTransfer(action.baiduBaseUrl, jobId, uploadTimeoutMs);
const uploadedJob = jobRecord(uploadDetail.job);
return {
@@ -400,7 +409,7 @@ async function executePgdataBackupAction(action: PgdataBackupScheduleAction, run
};
} finally {
await rm(tempDir, { recursive: true, force: true }).catch(() => undefined);
if (action.cleanupLocal) await rm(backupPath, { force: true }).catch(() => undefined);
if (action.cleanupLocal && (uploadJobId.length === 0 || uploadDetail !== null)) await rm(backupPath, { force: true }).catch(() => undefined);
}
}
+1 -1
View File
@@ -637,7 +637,7 @@ function OverviewPage({ data, onRaw, onNavigate }: AnyRecord) {
h(Panel, { title: "核心指标", eyebrow: "Control" },
h("div", { className: "metric-grid" },
h(MetricCard, { label: "数据库", value: overview.dbReady ? "READY" : "WAIT", hint: "PostgreSQL internal network", tone: overview.dbReady ? "ok" : "warn" }),
h(MetricCard, { label: "PGDATA", value: fmtBytes(pgdata.databaseBytes), hint: `${pgdata.volumeName || "unidesk_pgdata_10gb"} / ${pgdata.databasePretty || "--"}`, tone: "ok", testId: "pgdata-usage-card" }),
h(MetricCard, { label: "PGDATA", value: fmtBytes(pgdata.databaseBytes), hint: `${pgdata.volumeName || "unidesk_pgdata_10gb"} / ${pgdata.databasePretty || "--"} / budget ${pgdata.volumeSize || "--"}`, tone: "ok", testId: "pgdata-usage-card" }),
h(MetricCard, { label: "在线节点", value: overview.onlineNodeCount ?? 0, hint: `${overview.nodeCount ?? 0} registered`, tone: "ok" }),
h(MetricCard, { label: "WebSocket", value: overview.activeSocketCount ?? 0, hint: "Provider ingress sockets" }),
h(MetricCard, { label: "用户服务可用", value: microserviceTotal > 0 ? `${microserviceHealthy}/${microserviceTotal}` : "--", hint: microserviceTotal > 0 ? `healthyCount ${microserviceHealthy} · unhealthyCount ${microserviceUnhealthy}` : "strict /health probes", tone: microserviceTotal > 0 && microserviceUnhealthy === 0 ? "ok" : "warn", testId: "microservice-availability-card" }),