fix: make sub2api codex pool validation observable
This commit is contained in:
@@ -19,6 +19,9 @@ const defaultPoolApiKeySecretName = "sub2api-codex-pool-api-key";
|
||||
const defaultPoolApiKeySecretKey = "API_KEY";
|
||||
const defaultMinOwnerBalanceUsd = 1000;
|
||||
const defaultAccountPriority = 10;
|
||||
const remoteJobDir = "/tmp/unidesk-platform-infra-sub2api-codex-pool";
|
||||
const remoteJobTimeoutMs = 15 * 60_000;
|
||||
const remoteJobPollMs = 5_000;
|
||||
|
||||
interface DisclosureOptions {
|
||||
full: boolean;
|
||||
@@ -296,7 +299,7 @@ async function codexPoolSync(config: UniDeskConfig, options: SyncOptions): Promi
|
||||
tempUnschedulableCredentials: renderSub2ApiTempUnschedulableCredentials(profile.tempUnschedulable),
|
||||
})),
|
||||
};
|
||||
const result = await capture(config, g14K3sRoute, ["script"], syncScript(payload, pool));
|
||||
const result = await runRemoteCodexPoolScript(config, "sync", syncScript(payload, pool));
|
||||
const parsed = parseJsonOutput(result.stdout);
|
||||
if (options.raw) {
|
||||
return {
|
||||
@@ -322,7 +325,7 @@ async function codexPoolSync(config: UniDeskConfig, options: SyncOptions): Promi
|
||||
|
||||
async function codexPoolValidate(config: UniDeskConfig, options: DisclosureOptions): Promise<Record<string, unknown>> {
|
||||
const pool = readCodexPoolConfig();
|
||||
const result = await capture(config, g14K3sRoute, ["script"], validateScript(pool));
|
||||
const result = await runRemoteCodexPoolScript(config, "validate", validateScript(pool));
|
||||
const parsed = parseJsonOutput(result.stdout);
|
||||
if (options.raw) {
|
||||
return {
|
||||
@@ -3287,6 +3290,191 @@ async function capture(config: UniDeskConfig, target: string, args: string[], in
|
||||
return await runSshCommandCapture(config, target, args, input);
|
||||
}
|
||||
|
||||
async function runRemoteCodexPoolScript(config: UniDeskConfig, mode: "sync" | "validate", script: string): Promise<SshCaptureResult> {
|
||||
const jobName = `codex-pool-${mode}-${Date.now().toString(36)}`.slice(0, 63);
|
||||
const startedAtMs = Date.now();
|
||||
const start = await capture(config, g14K3sRoute, ["script"], remoteJobStartScript(jobName, script));
|
||||
const started = parseJsonOutput(start.stdout);
|
||||
if (start.exitCode !== 0 || boolField(started, "ok", false) !== true) return start;
|
||||
|
||||
let latest: RemoteCodexPoolJobStatus | null = null;
|
||||
while (Date.now() - startedAtMs <= remoteJobTimeoutMs) {
|
||||
await sleep(remoteJobPollMs);
|
||||
const probe = await capture(config, g14K3sRoute, ["script"], remoteJobStatusScript(jobName));
|
||||
const parsed = parseJsonOutput(probe.stdout);
|
||||
latest = normalizeRemoteJobStatus(parsed);
|
||||
process.stderr.write(`${JSON.stringify({
|
||||
event: "platform-infra.sub2api.codex-pool.progress",
|
||||
at: new Date().toISOString(),
|
||||
mode,
|
||||
jobName,
|
||||
status: latest?.status ?? "unknown",
|
||||
exitCode: latest?.exitCode ?? null,
|
||||
elapsedMs: Date.now() - startedAtMs,
|
||||
stdoutBytes: latest?.stdoutBytes ?? null,
|
||||
stderrBytes: latest?.stderrBytes ?? null,
|
||||
})}\n`);
|
||||
if (probe.exitCode !== 0) {
|
||||
return {
|
||||
exitCode: probe.exitCode,
|
||||
stdout: probe.stdout,
|
||||
stderr: `${start.stderr}${probe.stderr}`,
|
||||
};
|
||||
}
|
||||
if (latest?.status === "succeeded" || latest?.status === "failed") {
|
||||
return {
|
||||
exitCode: latest.exitCode ?? (latest.status === "succeeded" ? 0 : 1),
|
||||
stdout: latest.stdout ?? "",
|
||||
stderr: latest.stderr ?? "",
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
exitCode: 124,
|
||||
stdout: latest?.stdout ?? "",
|
||||
stderr: [
|
||||
latest?.stderr ?? "",
|
||||
`remote codex-pool ${mode} job ${jobName} did not finish within ${remoteJobTimeoutMs}ms`,
|
||||
`status command: bun scripts/cli.ts platform-infra sub2api codex-pool ${mode === "sync" ? "sync --confirm" : "validate"}`,
|
||||
].filter(Boolean).join("\n"),
|
||||
};
|
||||
}
|
||||
|
||||
interface RemoteCodexPoolJobStatus {
|
||||
status: "running" | "succeeded" | "failed" | "unknown";
|
||||
exitCode: number | null;
|
||||
stdout: string | null;
|
||||
stderr: string | null;
|
||||
stdoutBytes: number | null;
|
||||
stderrBytes: number | null;
|
||||
}
|
||||
|
||||
function normalizeRemoteJobStatus(parsed: Record<string, unknown> | null): RemoteCodexPoolJobStatus | null {
|
||||
if (parsed === null) return null;
|
||||
const status = parsed.status === "running" || parsed.status === "succeeded" || parsed.status === "failed" || parsed.status === "unknown"
|
||||
? parsed.status
|
||||
: "unknown";
|
||||
return {
|
||||
status,
|
||||
exitCode: typeof parsed.exitCode === "number" ? parsed.exitCode : null,
|
||||
stdout: typeof parsed.stdout === "string" ? parsed.stdout : null,
|
||||
stderr: typeof parsed.stderr === "string" ? parsed.stderr : null,
|
||||
stdoutBytes: typeof parsed.stdoutBytes === "number" ? parsed.stdoutBytes : null,
|
||||
stderrBytes: typeof parsed.stderrBytes === "number" ? parsed.stderrBytes : null,
|
||||
};
|
||||
}
|
||||
|
||||
function remoteJobStartScript(jobName: string, script: string): string {
|
||||
const scriptB64 = Buffer.from(script, "utf8").toString("base64");
|
||||
return `
|
||||
set -eu
|
||||
job=${shQuote(jobName)}
|
||||
dir=${shQuote(remoteJobDir)}
|
||||
mkdir -p "$dir"
|
||||
chmod 700 "$dir"
|
||||
script_path="$dir/$job.sh"
|
||||
stdout_path="$dir/$job.stdout"
|
||||
stderr_path="$dir/$job.stderr"
|
||||
exit_path="$dir/$job.exit"
|
||||
done_path="$dir/$job.done"
|
||||
pid_path="$dir/$job.pid"
|
||||
rm -f "$script_path" "$stdout_path" "$stderr_path" "$exit_path" "$done_path" "$pid_path"
|
||||
base64 -d > "$script_path" <<'UNIDESK_REMOTE_SCRIPT_B64'
|
||||
${scriptB64}
|
||||
UNIDESK_REMOTE_SCRIPT_B64
|
||||
chmod 700 "$script_path"
|
||||
(
|
||||
set +e
|
||||
trap '' HUP
|
||||
sh "$script_path" > "$stdout_path" 2> "$stderr_path"
|
||||
code=$?
|
||||
printf '%s' "$code" > "$exit_path"
|
||||
date -u +%Y-%m-%dT%H:%M:%SZ > "$done_path"
|
||||
rm -f "$script_path"
|
||||
) >/dev/null 2>&1 &
|
||||
pid=$!
|
||||
printf '%s' "$pid" > "$pid_path"
|
||||
python3 - <<PY
|
||||
import json
|
||||
print(json.dumps({"ok": True, "jobName": ${JSON.stringify(jobName)}, "pid": int("${"${pid}"}"), "dir": ${JSON.stringify(remoteJobDir)}}))
|
||||
PY
|
||||
`;
|
||||
}
|
||||
|
||||
function remoteJobStatusScript(jobName: string): string {
|
||||
return `
|
||||
set -eu
|
||||
python3 - <<'PY'
|
||||
import json
|
||||
import os
|
||||
import signal
|
||||
|
||||
job = ${JSON.stringify(jobName)}
|
||||
directory = ${JSON.stringify(remoteJobDir)}
|
||||
stdout_path = os.path.join(directory, job + ".stdout")
|
||||
stderr_path = os.path.join(directory, job + ".stderr")
|
||||
exit_path = os.path.join(directory, job + ".exit")
|
||||
done_path = os.path.join(directory, job + ".done")
|
||||
pid_path = os.path.join(directory, job + ".pid")
|
||||
|
||||
def read_text(path, limit=None):
|
||||
try:
|
||||
with open(path, "rb") as handle:
|
||||
data = handle.read()
|
||||
except FileNotFoundError:
|
||||
return None
|
||||
if limit is not None and len(data) > limit:
|
||||
data = data[-limit:]
|
||||
return data.decode("utf-8", errors="replace")
|
||||
|
||||
def file_size(path):
|
||||
try:
|
||||
return os.path.getsize(path)
|
||||
except FileNotFoundError:
|
||||
return 0
|
||||
|
||||
pid_text = read_text(pid_path)
|
||||
done = os.path.exists(done_path)
|
||||
exit_text = read_text(exit_path)
|
||||
exit_code = None
|
||||
if exit_text is not None:
|
||||
try:
|
||||
exit_code = int(exit_text.strip())
|
||||
except ValueError:
|
||||
exit_code = None
|
||||
running = False
|
||||
if pid_text is not None and not done:
|
||||
try:
|
||||
os.kill(int(pid_text.strip()), 0)
|
||||
running = True
|
||||
except Exception:
|
||||
running = False
|
||||
status = "succeeded" if done and exit_code == 0 else "failed" if done else "running" if running else "unknown"
|
||||
print(json.dumps({
|
||||
"ok": True,
|
||||
"jobName": job,
|
||||
"status": status,
|
||||
"pid": int(pid_text.strip()) if pid_text and pid_text.strip().isdigit() else None,
|
||||
"exitCode": exit_code,
|
||||
"stdoutBytes": file_size(stdout_path),
|
||||
"stderrBytes": file_size(stderr_path),
|
||||
"stdout": read_text(stdout_path, 500000) if done else read_text(stdout_path, 12000),
|
||||
"stderr": read_text(stderr_path, 120000) if done else read_text(stderr_path, 12000),
|
||||
"doneAt": read_text(done_path),
|
||||
}, ensure_ascii=False))
|
||||
PY
|
||||
`;
|
||||
}
|
||||
|
||||
function shQuote(value: string): string {
|
||||
return `'${value.replace(/'/gu, `'\\''`)}'`;
|
||||
}
|
||||
|
||||
function sleep(ms: number): Promise<void> {
|
||||
return new Promise((resolve) => setTimeout(resolve, ms));
|
||||
}
|
||||
|
||||
function parseJsonOutput(stdout: string): Record<string, unknown> | null {
|
||||
const trimmed = stdout.trim();
|
||||
if (trimmed.length === 0) return null;
|
||||
|
||||
Reference in New Issue
Block a user