fix: restore node resource status sync
This commit is contained in:
@@ -1,6 +1,6 @@
|
||||
---
|
||||
name: unidesk-ops
|
||||
description: UniDesk 手动运维 CLI — `server`、`gc` 和 PK01 `platform-db postgres` 子命令,覆盖主 server 启停、健康检查、swap、日志、Docker 镜像清理、磁盘 GC、服务重建和 PK01 host PostgreSQL 运维。用户提到 server start、server status、server swap、server rebuild、gc、磁盘清理、platform-db、PK01 PostgreSQL、运维时使用。
|
||||
description: UniDesk 手动运维 CLI — `server`、`gc` 和 PK01 `platform-db postgres` 子命令,覆盖主 server 启停、健康检查、swap、日志、Docker 镜像清理、磁盘 GC、服务重建/重启和 PK01 host PostgreSQL 运维。用户提到 server start、server status、server swap、server rebuild、server restart、gc、磁盘清理、platform-db、PK01 PostgreSQL、运维时使用。
|
||||
---
|
||||
|
||||
# UniDesk 手动运维 CLI
|
||||
@@ -16,9 +16,11 @@ description: UniDesk 手动运维 CLI — `server`、`gc` 和 PK01 `platform-db
|
||||
```bash
|
||||
bun scripts/cli.ts server start
|
||||
bun scripts/cli.ts server stop
|
||||
bun scripts/cli.ts server restart <service>
|
||||
```
|
||||
|
||||
异步 job 模式,返回 `job.id`、日志路径。`start` 执行 Docker 构建+启动,`stop` 停止 Compose project 全部服务。
|
||||
`restart` 是无构建单服务维护重启,使用现有镜像执行 `--no-build --no-deps --force-recreate` 并等待容器 `healthy/running`,适合刷新 provider-gateway 这类运行态异常,不能替代镜像发布或源码构建。
|
||||
|
||||
---
|
||||
|
||||
|
||||
+11
-1
@@ -1,6 +1,6 @@
|
||||
import { readConfig } from "./src/config";
|
||||
import { debugDispatch, debugHealth, debugTask, isDebugDispatchCommand, type DebugDispatchCommand } from "./src/debug";
|
||||
import { isRebuildableService, rebuildService, stackLogs, stackStatus, startStack, stopStack, unsupportedRebuildService } from "./src/docker";
|
||||
import { isRebuildableService, rebuildService, restartService, stackLogs, stackStatus, startStack, stopStack, unsupportedRebuildService, unsupportedRestartService } from "./src/docker";
|
||||
import { emitError, emitJson, emitText, isRenderedCliResult } from "./src/output";
|
||||
import { cancelJob, jobWithTail, listJobs, listJobsSummary, readJob, runJob } from "./src/jobs";
|
||||
import { checkHelp, parseCheckOptions, runChecks, runRecoveryGuardrailsCheck } from "./src/check";
|
||||
@@ -439,6 +439,16 @@ async function main(): Promise<void> {
|
||||
emitJson(commandName, rebuildService(config, third));
|
||||
return;
|
||||
}
|
||||
if (sub === "restart") {
|
||||
if (!isRebuildableService(third)) {
|
||||
const result = unsupportedRestartService(third);
|
||||
emitJson(commandName, result, false);
|
||||
process.exitCode = 1;
|
||||
return;
|
||||
}
|
||||
emitJson(commandName, restartService(config, third));
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
if (top === "gc") {
|
||||
|
||||
@@ -100,6 +100,17 @@ export function unsupportedRebuildService(value: string | undefined): Record<str
|
||||
};
|
||||
}
|
||||
|
||||
export function unsupportedRestartService(value: string | undefined): Record<string, unknown> {
|
||||
const base = unsupportedRebuildService(value);
|
||||
return {
|
||||
...base,
|
||||
error: "unsupported-server-restart",
|
||||
reason: typeof base.reason === "string" ? base.reason.replace(/server rebuild/g, "server restart") : base.reason,
|
||||
replacement: typeof base.replacement === "string" ? base.replacement.replace(/server rebuild/g, "server restart") : base.replacement,
|
||||
policy: "server restart is a no-build maintenance entrypoint and must not silently mutate upstream images, D601 services, database, or unknown objects",
|
||||
};
|
||||
}
|
||||
|
||||
export function resolveComposeCommand(config: UniDeskConfig, envFile: string): string[] {
|
||||
const composeFile = rootPath(config.docker.composeFile);
|
||||
if (commandOk(["docker", "compose", "version"], repoRoot)) {
|
||||
@@ -431,6 +442,73 @@ export function rebuildService(config: UniDeskConfig, service: RebuildableServic
|
||||
};
|
||||
}
|
||||
|
||||
export function restartService(config: UniDeskConfig, service: RebuildableService): unknown {
|
||||
const runtimeEnv = writeComposeEnv(config, false);
|
||||
const compose = resolveComposeCommand(config, runtimeEnv.envFile);
|
||||
const listServiceContainersCommand = [
|
||||
"docker",
|
||||
"ps",
|
||||
"-q",
|
||||
"--filter",
|
||||
`label=com.docker.compose.project=${config.docker.projectName}`,
|
||||
"--filter",
|
||||
`label=com.docker.compose.service=${service}`,
|
||||
"--filter",
|
||||
"label=com.docker.compose.oneoff=False",
|
||||
];
|
||||
const upCommand = [...compose, "up", "-d", "--no-build", "--no-deps", "--force-recreate", service];
|
||||
const listAllServiceContainersCommand = [...listServiceContainersCommand];
|
||||
listAllServiceContainersCommand[2] = "-a";
|
||||
const validateScript = [
|
||||
"ready=0",
|
||||
"for attempt in $(seq 1 60); do",
|
||||
`cid=$(${shellJoin(listServiceContainersCommand)} || true)`,
|
||||
"if [ -n \"$cid\" ]; then",
|
||||
"health=$(docker inspect -f '{{if .State.Health}}{{.State.Health.Status}}{{else}}{{.State.Status}}{{end}}' $cid 2>/dev/null | head -1 || true)",
|
||||
`echo "service_container_probe service=${service} attempt=$attempt cid=$cid health=$health"`,
|
||||
"if [ \"$health\" = \"healthy\" ] || [ \"$health\" = \"running\" ]; then ready=1; break; fi",
|
||||
"else",
|
||||
`echo "service_container_probe service=${service} attempt=$attempt cid=missing"`,
|
||||
"fi",
|
||||
"sleep 1",
|
||||
"done",
|
||||
"if [ \"$ready\" != \"1\" ]; then",
|
||||
`echo "service_container_not_ready service=${service}" >&2`,
|
||||
`${shellJoin(listAllServiceContainersCommand)} --format '{{.ID}} {{.Names}} {{.Status}}' >&2 || true`,
|
||||
"exit 1",
|
||||
"fi",
|
||||
].join("\n");
|
||||
const script = [
|
||||
"set -euo pipefail",
|
||||
`echo ${shellJoin(["restart_service", service, "no_build_force_recreate_with_validation"])}`,
|
||||
restrictedHostAccessScript(config),
|
||||
shellJoin(upCommand),
|
||||
validateScript,
|
||||
].join("\n");
|
||||
const command = ["bash", "-lc", composeLockedScript(script)];
|
||||
const job = startJob("server_restart", command, `Restart and validate UniDesk ${service} without building images`);
|
||||
return {
|
||||
job,
|
||||
runtimeEnv,
|
||||
service,
|
||||
command,
|
||||
strategy: {
|
||||
buildBeforeReplace: false,
|
||||
noBuild: true,
|
||||
replaceScope: {
|
||||
projectLabel: config.docker.projectName,
|
||||
serviceLabel: service,
|
||||
},
|
||||
noDeps: true,
|
||||
forceRecreate: true,
|
||||
composeMutationLock: rootPath(".state", "locks", "server-compose.lock"),
|
||||
jobRunner: "local",
|
||||
postUpValidation: true,
|
||||
namedVolumesPreserved: true,
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
function fixedPorts(config: UniDeskConfig): Array<{ name: string; port: number; listening: boolean }> {
|
||||
return [
|
||||
{ name: "frontend", port: config.network.frontend.port, listening: isPortListening(config.network.frontend.port) },
|
||||
|
||||
+4
-1
@@ -19,6 +19,7 @@ export function rootHelp(): unknown {
|
||||
{ command: "server cleanup plan [--min-age-hours N] [--limit N]", description: "Dry-run Docker image cleanup plan only: list active/protected images, stale candidates older than the default 24h threshold, risk, estimated reclaim, and manual review commands without deleting anything." },
|
||||
{ command: "gc plan|run|db-trace|policy|remote [--confirm] [--logs-keep-days N] [--include-browser-cache]", description: "One-time main-server or remote provider disk relief and low-risk anti-bloat policy for logs, journald, allowlisted /tmp artifacts, scoped core dumps and explicit trace telemetry retention; plan is read-only and run requires --confirm." },
|
||||
{ command: "server rebuild <backend-core|frontend|dev-frontend-proxy|provider-gateway|todo-note|code-queue-mgr|project-manager|baidu-netdisk|oa-event-flow>", description: "Maintenance-only local Compose rebuild for reviewed main-server services; frontend standard release must use CI artifact plus deploy apply dev/prod artifact consumers." },
|
||||
{ command: "server restart <backend-core|frontend|dev-frontend-proxy|provider-gateway|todo-note|code-queue-mgr|project-manager|baidu-netdisk|oa-event-flow>", description: "No-build single-service Compose restart for reviewed main-server maintenance recovery; returns an async job and validates the recreated container." },
|
||||
{ command: "provider attach <providerId> [--master-server URL] [--up] [--force] | provider triage <providerId> [--observed-error text] [--observed-scope scope] [--microservice id ...] [--full|--raw]", description: "Generate the minimal external provider-gateway env/compose bundle or run the low-noise read-only provider health triage contract." },
|
||||
{ command: "trans <route> [operation args...] (alias of ssh <route> ...)", description: "Open a Host SSH / WSL SSH maintenance session; provider WebSocket carries control and host.ssh.tcp-pool carries stdin/stdout/stderr data." },
|
||||
{ command: "trans gh:/owner/repo[/pr|/issue][/number[/1]] ls|cat|rg|patch-apply", description: "Treat GitHub PRs/issues as virtual text directories; `ls --full` shows state/floors/body length, and `patch-apply` updates first-floor `body.md` through UniDesk gh plus apply-patch v2." },
|
||||
@@ -101,7 +102,7 @@ export function isHelpToken(value: string | undefined): boolean {
|
||||
|
||||
export function serverHelp(action: string | undefined = undefined): unknown {
|
||||
return {
|
||||
command: action === undefined || isHelpToken(action) ? "server start|stop|status|swap|logs|cleanup|rebuild" : `server ${action}`,
|
||||
command: action === undefined || isHelpToken(action) ? "server start|stop|status|swap|logs|cleanup|rebuild|restart" : `server ${action}`,
|
||||
output: "json",
|
||||
description: "Manage the fixed main-server Docker Compose stack without exposing backend-core REST publicly.",
|
||||
usage: {
|
||||
@@ -112,6 +113,7 @@ export function serverHelp(action: string | undefined = undefined): unknown {
|
||||
logs: "bun scripts/cli.ts server logs [--tail-bytes N]",
|
||||
cleanup: "bun scripts/cli.ts server cleanup plan [--min-age-hours N] [--limit N]",
|
||||
rebuild: "bun scripts/cli.ts server rebuild <backend-core|frontend|dev-frontend-proxy|provider-gateway|todo-note|code-queue-mgr|project-manager|baidu-netdisk|oa-event-flow>",
|
||||
restart: "bun scripts/cli.ts server restart <backend-core|frontend|dev-frontend-proxy|provider-gateway|todo-note|code-queue-mgr|project-manager|baidu-netdisk|oa-event-flow>",
|
||||
},
|
||||
cleanupPlan: {
|
||||
dryRunOnly: true,
|
||||
@@ -138,6 +140,7 @@ export function serverHelp(action: string | undefined = undefined): unknown {
|
||||
maintenanceOnly: {
|
||||
frontend: "server rebuild frontend is not standard release evidence; publish 127.0.0.1:5000/unidesk/frontend:<commit> with CI, then consume it with deploy apply --env dev/prod --service frontend --commit <full-sha>",
|
||||
userServices: "server rebuild for main-server user services is reserved for local maintenance, bootstrap or recovery and must not replace commit-pinned artifact CD",
|
||||
restart: "server restart is for no-build single-service recovery when the existing image is already present and the goal is to refresh runtime state",
|
||||
},
|
||||
nonRebuildableBoundary: {
|
||||
upstreamImages: ["filebrowser", "filebrowser-d601"],
|
||||
|
||||
@@ -164,11 +164,15 @@ function systemStatusSignal(debug: unknown, providerId: string): ProviderTriageS
|
||||
if (item === null) return signal("backend-core-system-status", "provider-gateway", "unknown", `no system status sample for ${providerId}`);
|
||||
const current = asRecord(item.current);
|
||||
const currentOk = current === null ? null : current.ok;
|
||||
const status: ProviderSignalStatus = current === null ? "unknown" : currentOk === false ? "degraded" : "ok";
|
||||
return signal("backend-core-system-status", "provider-gateway", status, `system status current.ok=${String(currentOk)} updatedAt=${item.updatedAt ?? "null"}`, {
|
||||
const stale = item.stale === true;
|
||||
const status: ProviderSignalStatus = current === null ? stale ? "degraded" : "unknown" : currentOk === false ? "degraded" : "ok";
|
||||
return signal("backend-core-system-status", "provider-gateway", status, `system status current.ok=${String(currentOk)} stale=${String(stale)} updatedAt=${item.updatedAt ?? "null"}`, {
|
||||
providerId: item.providerId,
|
||||
nodeStatus: item.nodeStatus,
|
||||
updatedAt: item.updatedAt,
|
||||
currentCollectedAt: item.currentCollectedAt ?? null,
|
||||
stale,
|
||||
staleSeconds: item.staleSeconds ?? null,
|
||||
current: current === null ? null : {
|
||||
ok: current.ok,
|
||||
collectedAt: current.collectedAt,
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::Context;
|
||||
use chrono::{DateTime, Utc};
|
||||
use chrono::{DateTime, Duration, Utc};
|
||||
use serde_json::{json, Value};
|
||||
use tokio_postgres::Row;
|
||||
|
||||
@@ -192,18 +192,43 @@ pub async fn update_provider_heartbeat(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn provider_collected_at(
|
||||
state: &Arc<AppState>,
|
||||
provider_id: &str,
|
||||
source: &str,
|
||||
value: &str,
|
||||
) -> DateTime<Utc> {
|
||||
match DateTime::parse_from_rfc3339(value) {
|
||||
Ok(parsed) => parsed.with_timezone(&Utc),
|
||||
Err(error) => {
|
||||
state.log(
|
||||
"warn",
|
||||
"provider_status_collected_at_invalid",
|
||||
Some(json!({
|
||||
"providerId": provider_id,
|
||||
"source": source,
|
||||
"value": value,
|
||||
"error": error.to_string()
|
||||
})),
|
||||
);
|
||||
Utc::now()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn upsert_docker_status(
|
||||
state: &Arc<AppState>,
|
||||
provider_id: &str,
|
||||
status: &JsonValue,
|
||||
collected_at: &str,
|
||||
) -> anyhow::Result<()> {
|
||||
let collected_at = provider_collected_at(state, provider_id, "docker_status", collected_at);
|
||||
let client = state.pool.get().await?;
|
||||
client
|
||||
.execute(
|
||||
r#"
|
||||
INSERT INTO unidesk_node_docker_status (provider_id, status, collected_at, updated_at)
|
||||
VALUES ($1, $2, $3::timestamptz, now())
|
||||
VALUES ($1, $2, $3, now())
|
||||
ON CONFLICT (provider_id) DO UPDATE SET
|
||||
status = EXCLUDED.status,
|
||||
collected_at = EXCLUDED.collected_at,
|
||||
@@ -224,12 +249,13 @@ pub async fn upsert_system_status(
|
||||
let cpu_percent = nested_number(status, "cpu", "percent");
|
||||
let memory_percent = nested_number(status, "memory", "percent");
|
||||
let disk_percent = nested_number(status, "disk", "percent");
|
||||
let collected_at = provider_collected_at(state, provider_id, "system_status", collected_at);
|
||||
let mut client = state.pool.get().await?;
|
||||
let tx = client.transaction().await?;
|
||||
tx.execute(
|
||||
r#"
|
||||
INSERT INTO unidesk_node_system_status (provider_id, status, collected_at, updated_at)
|
||||
VALUES ($1, $2, $3::timestamptz, now())
|
||||
VALUES ($1, $2, $3, now())
|
||||
ON CONFLICT (provider_id) DO UPDATE SET
|
||||
status = EXCLUDED.status,
|
||||
collected_at = EXCLUDED.collected_at,
|
||||
@@ -241,10 +267,18 @@ pub async fn upsert_system_status(
|
||||
tx.execute(
|
||||
r#"
|
||||
INSERT INTO unidesk_node_metric_samples (provider_id, collected_at, cpu_percent, memory_percent, disk_percent, sample)
|
||||
VALUES ($1, $2::timestamptz, $3, $4, $5, $6)
|
||||
VALUES ($1, $2, $3, $4, $5, $6)
|
||||
"#,
|
||||
&[&provider_id, &collected_at, &cpu_percent, &memory_percent, &disk_percent, status],
|
||||
).await?;
|
||||
&[
|
||||
&provider_id,
|
||||
&collected_at,
|
||||
&cpu_percent,
|
||||
&memory_percent,
|
||||
&disk_percent,
|
||||
status,
|
||||
],
|
||||
)
|
||||
.await?;
|
||||
tx.execute(
|
||||
r#"
|
||||
DELETE FROM unidesk_node_metric_samples
|
||||
@@ -321,6 +355,18 @@ fn metric_point_from_sample(sample: &JsonValue, collected_at: &str) -> JsonValue
|
||||
})
|
||||
}
|
||||
|
||||
const SYSTEM_STATUS_STALE_AFTER_SECONDS: i64 = 300;
|
||||
|
||||
fn system_status_is_stale(collected_at: Option<DateTime<Utc>>, now: DateTime<Utc>) -> bool {
|
||||
collected_at
|
||||
.map(|value| now.signed_duration_since(value) > Duration::seconds(SYSTEM_STATUS_STALE_AFTER_SECONDS))
|
||||
.unwrap_or(false)
|
||||
}
|
||||
|
||||
fn system_status_age_seconds(collected_at: Option<DateTime<Utc>>, now: DateTime<Utc>) -> Option<i64> {
|
||||
collected_at.map(|value| now.signed_duration_since(value).num_seconds().max(0))
|
||||
}
|
||||
|
||||
pub async fn get_node_system_statuses(
|
||||
state: &Arc<AppState>,
|
||||
limit: i64,
|
||||
@@ -328,7 +374,7 @@ pub async fn get_node_system_statuses(
|
||||
let client = state.pool.get().await?;
|
||||
let current_rows = client.query(
|
||||
r#"
|
||||
SELECT n.provider_id, n.name, n.status AS node_status, s.status AS system_status, s.updated_at
|
||||
SELECT n.provider_id, n.name, n.status AS node_status, s.status AS system_status, s.collected_at AS system_collected_at, s.updated_at
|
||||
FROM unidesk_nodes n
|
||||
LEFT JOIN unidesk_node_system_status s ON s.provider_id = n.provider_id
|
||||
ORDER BY n.status DESC, n.provider_id ASC
|
||||
@@ -366,15 +412,23 @@ pub async fn get_node_system_statuses(
|
||||
&collected_at.to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
|
||||
));
|
||||
}
|
||||
let now = Utc::now();
|
||||
Ok(current_rows.into_iter().map(|row| {
|
||||
let provider_id: String = row.get("provider_id");
|
||||
let updated_at: Option<DateTime<Utc>> = row.get("updated_at");
|
||||
let system_collected_at: Option<DateTime<Utc>> = row.get("system_collected_at");
|
||||
let system_status: Option<JsonValue> = row.get("system_status");
|
||||
let stale = system_status_is_stale(system_collected_at, now);
|
||||
let current = if stale { None } else { system_status.clone() };
|
||||
json!({
|
||||
"providerId": provider_id,
|
||||
"name": row.get::<_, String>("name"),
|
||||
"nodeStatus": if row.get::<_, String>("node_status") == "online" { "online" } else { "offline" },
|
||||
"current": system_status,
|
||||
"current": current,
|
||||
"lastKnown": system_status,
|
||||
"currentCollectedAt": system_collected_at.map(|value| value.to_rfc3339_opts(chrono::SecondsFormat::Millis, true)),
|
||||
"stale": stale,
|
||||
"staleSeconds": system_status_age_seconds(system_collected_at, now),
|
||||
"history": history_by_provider.remove(&provider_id).unwrap_or_default(),
|
||||
"updatedAt": updated_at.map(|value| value.to_rfc3339_opts(chrono::SecondsFormat::Millis, true)),
|
||||
})
|
||||
|
||||
@@ -237,9 +237,22 @@ function metricPointFromSample(sample: unknown, collectedAt: string): JsonValue
|
||||
};
|
||||
}
|
||||
|
||||
const SYSTEM_STATUS_STALE_AFTER_SECONDS = 300;
|
||||
|
||||
function systemStatusAgeSeconds(collectedAt: unknown, now: Date): number | null {
|
||||
const date = collectedAt instanceof Date ? collectedAt : typeof collectedAt === "string" ? new Date(collectedAt) : null;
|
||||
if (date === null || Number.isNaN(date.getTime())) return null;
|
||||
return Math.max(0, Math.floor((now.getTime() - date.getTime()) / 1000));
|
||||
}
|
||||
|
||||
function systemStatusIsStale(collectedAt: unknown, now: Date): boolean {
|
||||
const ageSeconds = systemStatusAgeSeconds(collectedAt, now);
|
||||
return ageSeconds !== null && ageSeconds > SYSTEM_STATUS_STALE_AFTER_SECONDS;
|
||||
}
|
||||
|
||||
export async function getNodeSystemStatuses(limit: number): Promise<ApiNodeSystemStatus[]> {
|
||||
const currentRows = await sql()<Array<Record<string, unknown>>>`
|
||||
SELECT n.provider_id, n.name, n.status AS node_status, s.status AS system_status, s.updated_at
|
||||
SELECT n.provider_id, n.name, n.status AS node_status, s.status AS system_status, s.collected_at AS system_collected_at, s.updated_at
|
||||
FROM unidesk_nodes n
|
||||
LEFT JOIN unidesk_node_system_status s ON s.provider_id = n.provider_id
|
||||
ORDER BY n.status DESC, n.provider_id ASC
|
||||
@@ -265,13 +278,21 @@ export async function getNodeSystemStatuses(limit: number): Promise<ApiNodeSyste
|
||||
history.push(metricPointFromSample(row.sample ?? {}, collectedAt));
|
||||
historyByProvider.set(providerId, history);
|
||||
}
|
||||
const now = new Date();
|
||||
return currentRows.map((row) => {
|
||||
const providerId = String(row.provider_id);
|
||||
const lastKnown = row.system_status === null || row.system_status === undefined ? null : (row.system_status as JsonValue);
|
||||
const currentCollectedAt = row.system_collected_at instanceof Date ? row.system_collected_at.toISOString() : row.system_collected_at === null || row.system_collected_at === undefined ? null : String(row.system_collected_at);
|
||||
const stale = systemStatusIsStale(row.system_collected_at, now);
|
||||
return {
|
||||
providerId,
|
||||
name: String(row.name),
|
||||
nodeStatus: row.node_status === "online" ? "online" : "offline",
|
||||
current: row.system_status === null || row.system_status === undefined ? null : (row.system_status as JsonValue),
|
||||
current: stale ? null : lastKnown,
|
||||
lastKnown,
|
||||
currentCollectedAt,
|
||||
stale,
|
||||
staleSeconds: systemStatusAgeSeconds(row.system_collected_at, now),
|
||||
history: historyByProvider.get(providerId) ?? [],
|
||||
updatedAt: row.updated_at instanceof Date ? row.updated_at.toISOString() : row.updated_at === null || row.updated_at === undefined ? null : String(row.updated_at),
|
||||
};
|
||||
|
||||
@@ -571,22 +571,22 @@ async fn provider_socket_task(state: Arc<AppState>, socket: WebSocket) {
|
||||
while let Some(message) = receiver_ws.next().await {
|
||||
match message {
|
||||
Ok(Message::Text(text)) => {
|
||||
if let Err(error) = handle_provider_text(&state, &connection, text).await {
|
||||
if let Err(error) = handle_provider_text(&state, &connection, &text).await {
|
||||
state.log(
|
||||
"error",
|
||||
"provider_message_failed",
|
||||
Some(json!({ "error": error.to_string() })),
|
||||
Some(provider_message_error_context(&text, &error)),
|
||||
);
|
||||
let _ = tx.send(Message::Text(json!({ "type": "ack", "requestId": "message", "ok": false, "message": error.to_string() }).to_string()));
|
||||
}
|
||||
}
|
||||
Ok(Message::Binary(bytes)) => {
|
||||
if let Ok(text) = String::from_utf8(bytes.to_vec()) {
|
||||
if let Err(error) = handle_provider_text(&state, &connection, text).await {
|
||||
if let Err(error) = handle_provider_text(&state, &connection, &text).await {
|
||||
state.log(
|
||||
"error",
|
||||
"provider_message_failed",
|
||||
Some(json!({ "error": error.to_string() })),
|
||||
Some(provider_message_error_context(&text, &error)),
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -629,12 +629,36 @@ async fn provider_socket_task(state: Arc<AppState>, socket: WebSocket) {
|
||||
send_task.abort();
|
||||
}
|
||||
|
||||
fn provider_message_error_context(text: &str, error: &anyhow::Error) -> Value {
|
||||
let parsed = serde_json::from_str::<Value>(text).ok();
|
||||
json!({
|
||||
"error": error.to_string(),
|
||||
"messageType": parsed
|
||||
.as_ref()
|
||||
.and_then(|value| value.get("type"))
|
||||
.and_then(Value::as_str)
|
||||
.unwrap_or("unknown"),
|
||||
"providerId": parsed
|
||||
.as_ref()
|
||||
.and_then(|value| value.get("providerId"))
|
||||
.and_then(Value::as_str)
|
||||
.unwrap_or("unknown"),
|
||||
"requestId": parsed
|
||||
.as_ref()
|
||||
.and_then(|value| value.get("requestId"))
|
||||
.and_then(Value::as_str)
|
||||
.unwrap_or("unknown"),
|
||||
"textBytes": text.len(),
|
||||
"textPreview": text.chars().take(300).collect::<String>(),
|
||||
})
|
||||
}
|
||||
|
||||
async fn handle_provider_text(
|
||||
state: &Arc<AppState>,
|
||||
connection: &Arc<ProviderConnection>,
|
||||
text: String,
|
||||
text: &str,
|
||||
) -> anyhow::Result<()> {
|
||||
let message: Value = serde_json::from_str(&text)?;
|
||||
let message: Value = serde_json::from_str(text)?;
|
||||
let provider_id = message
|
||||
.get("providerId")
|
||||
.and_then(Value::as_str)
|
||||
@@ -845,6 +869,7 @@ pub async fn mark_stale_providers_offline(state: &Arc<AppState>) -> anyhow::Resu
|
||||
return Ok(());
|
||||
}
|
||||
let timeout_ms = state.config.heartbeat_timeout_ms as i64;
|
||||
let cutoff = Utc::now() - Duration::milliseconds(timeout_ms);
|
||||
let client = state.pool.get().await?;
|
||||
let rows = client
|
||||
.query(
|
||||
@@ -853,10 +878,10 @@ pub async fn mark_stale_providers_offline(state: &Arc<AppState>) -> anyhow::Resu
|
||||
SET status = 'offline', updated_at = now()
|
||||
WHERE status = 'online'
|
||||
AND last_heartbeat IS NOT NULL
|
||||
AND last_heartbeat < now() - ($1 * interval '1 millisecond')
|
||||
AND last_heartbeat < $1
|
||||
RETURNING provider_id
|
||||
"#,
|
||||
&[&timeout_ms],
|
||||
&[&cutoff],
|
||||
)
|
||||
.await?;
|
||||
for row in rows {
|
||||
|
||||
File diff suppressed because one or more lines are too long
@@ -784,11 +784,50 @@ function HeartbeatPage({ nodes }: AnyRecord) {
|
||||
);
|
||||
}
|
||||
|
||||
function metricPointFromCurrent(current: AnyRecord): AnyRecord {
|
||||
const cpu = current?.cpu || {};
|
||||
const memory = current?.memory || {};
|
||||
const disk = current?.disk || {};
|
||||
return {
|
||||
at: current?.collectedAt,
|
||||
cpuPercent: asNumber(cpu.percent),
|
||||
memoryPercent: asNumber(memory.percent),
|
||||
diskPercent: asNumber(disk.percent),
|
||||
memoryUsedBytes: asNumber(memory.usedBytes),
|
||||
memoryTotalBytes: asNumber(memory.totalBytes),
|
||||
diskUsedBytes: asNumber(disk.usedBytes),
|
||||
diskTotalBytes: asNumber(disk.totalBytes),
|
||||
load1: asNumber(cpu.load1),
|
||||
};
|
||||
}
|
||||
|
||||
function synchronizedMetricPoints(history: any[], current: AnyRecord | null): AnyRecord[] {
|
||||
if (!current) return [];
|
||||
const currentPoint = metricPointFromCurrent(current);
|
||||
const currentAt = timeMs(currentPoint.at);
|
||||
const points = (Array.isArray(history) ? history : [])
|
||||
.filter((point) => point && typeof point === "object")
|
||||
.filter((point) => currentAt === null || timeMs(point.at) !== currentAt);
|
||||
points.push(currentPoint);
|
||||
return points
|
||||
.sort((left, right) => (timeMs(left.at) ?? 0) - (timeMs(right.at) ?? 0))
|
||||
.slice(-60);
|
||||
}
|
||||
|
||||
function NodeMonitorPage({ nodes, systemStatuses, tasks, onRaw, refresh }: AnyRecord) {
|
||||
const [selectedProvider, setSelectedProvider] = useState("");
|
||||
const merged = useMemo(() => nodes.map((node: any) => {
|
||||
const status = systemStatuses.find((item: any) => item.providerId === node.providerId);
|
||||
return { ...node, systemCurrent: status?.current || null, systemHistory: status?.history || [], systemUpdatedAt: status?.updatedAt || null };
|
||||
return {
|
||||
...node,
|
||||
systemCurrent: status?.current || null,
|
||||
systemLastKnown: status?.lastKnown || null,
|
||||
systemCurrentCollectedAt: status?.currentCollectedAt || null,
|
||||
systemStale: Boolean(status?.stale),
|
||||
systemStaleSeconds: status?.staleSeconds ?? null,
|
||||
systemHistory: status?.history || [],
|
||||
systemUpdatedAt: status?.updatedAt || null,
|
||||
};
|
||||
}), [nodes, systemStatuses]);
|
||||
const active = merged.find((node: any) => node.providerId === selectedProvider) || merged[0] || null;
|
||||
useEffect(() => {
|
||||
@@ -802,12 +841,10 @@ function NodeMonitorPage({ nodes, systemStatuses, tasks, onRaw, refresh }: AnyRe
|
||||
const cpu = current?.cpu || {};
|
||||
const memory = current?.memory || {};
|
||||
const disk = current?.disk || {};
|
||||
const points = history.length > 0 ? history : current ? [{
|
||||
at: current.collectedAt,
|
||||
cpuPercent: asNumber(cpu.percent),
|
||||
memoryPercent: asNumber(memory.percent),
|
||||
diskPercent: asNumber(disk.percent),
|
||||
}] : [];
|
||||
const points = synchronizedMetricPoints(history, current);
|
||||
const staleText = active.systemCurrentCollectedAt
|
||||
? `最后采样 ${fmtDate(active.systemCurrentCollectedAt)},已过期 ${fmtDuration(asNumber(active.systemStaleSeconds))}`
|
||||
: "最后采样时间不可用";
|
||||
|
||||
return h("div", { className: "monitor-page", "data-testid": "node-monitor-page" },
|
||||
h("div", { className: "docker-node-strip" },
|
||||
@@ -820,7 +857,7 @@ function NodeMonitorPage({ nodes, systemStatuses, tasks, onRaw, refresh }: AnyRe
|
||||
h("span", { className: `pulse ${node.status}` }),
|
||||
h("strong", null, node.name),
|
||||
h("code", null, node.providerId),
|
||||
h("span", null, node.systemCurrent ? `CPU ${fmtPercent(node.systemCurrent.cpu?.percent)} / MEM ${fmtPercent(node.systemCurrent.memory?.percent)}` : "等待指标"),
|
||||
h("span", null, node.systemCurrent ? `CPU ${fmtPercent(node.systemCurrent.cpu?.percent)} / MEM ${fmtPercent(node.systemCurrent.memory?.percent)}` : node.systemStale ? "指标过期" : "等待指标"),
|
||||
)),
|
||||
),
|
||||
h("div", { className: "monitor-layout" },
|
||||
@@ -828,9 +865,9 @@ function NodeMonitorPage({ nodes, systemStatuses, tasks, onRaw, refresh }: AnyRe
|
||||
title: "任务管理器视图",
|
||||
eyebrow: active.name,
|
||||
className: "monitor-main-panel",
|
||||
actions: current ? h(RawButton, { title: `System ${active.providerId}`, data: { current, history }, onOpen: onRaw }) : null,
|
||||
actions: current || active.systemStale ? h(RawButton, { title: `System ${active.providerId}`, data: { current, lastKnown: active.systemLastKnown, currentCollectedAt: active.systemCurrentCollectedAt, stale: active.systemStale, staleSeconds: active.systemStaleSeconds, history }, onOpen: onRaw }) : null,
|
||||
},
|
||||
!current ? h(EmptyState, { title: "系统指标未上报", text: "provider-gateway 会周期性采集 /proc 与 df,并保存历史曲线" }) :
|
||||
!current ? h(EmptyState, { title: active.systemStale ? "系统指标已过期" : "系统指标未上报", text: active.systemStale ? `${staleText};等待 provider-gateway 恢复 system.status 上报` : "provider-gateway 会周期性采集 /proc 与 df,并保存历史曲线" }) :
|
||||
h("div", null,
|
||||
h("div", { className: "monitor-hero" },
|
||||
h("div", null,
|
||||
@@ -854,7 +891,7 @@ function NodeMonitorPage({ nodes, systemStatuses, tasks, onRaw, refresh }: AnyRe
|
||||
h(MetricCard, { label: "CPU 当前", value: fmtPercent(cpu.percent), hint: `history ${points.length} samples`, tone: "ok" }),
|
||||
h(MetricCard, { label: "实际内存", value: fmtBytes(memory.usedBytes), hint: `${fmtPercent(memory.percent)} 不含缓存` }),
|
||||
h(MetricCard, { label: "硬盘已用", value: fmtBytes(disk.usedBytes), hint: fmtPercent(disk.percent) }),
|
||||
h(MetricCard, { label: "更新时间", value: fmtDate(active.systemUpdatedAt || current.collectedAt), hint: active.providerId }),
|
||||
h(MetricCard, { label: "更新时间", value: fmtDate(current.collectedAt || active.systemUpdatedAt), hint: active.providerId }),
|
||||
),
|
||||
h(ProcessResourceTable, { current, onRaw }),
|
||||
),
|
||||
|
||||
@@ -278,6 +278,10 @@ export interface ApiNodeSystemStatus {
|
||||
name: string;
|
||||
nodeStatus: "online" | "offline";
|
||||
current: JsonValue | null;
|
||||
lastKnown?: JsonValue | null;
|
||||
currentCollectedAt?: string | null;
|
||||
stale?: boolean;
|
||||
staleSeconds?: number | null;
|
||||
history: JsonValue[];
|
||||
updatedAt: string | null;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user