diff --git a/scripts/src/platform-infra-sub2api-codex.ts b/scripts/src/platform-infra-sub2api-codex.ts index fcc2ca7b..97e8a03e 100644 --- a/scripts/src/platform-infra-sub2api-codex.ts +++ b/scripts/src/platform-infra-sub2api-codex.ts @@ -19,6 +19,9 @@ const defaultPoolApiKeySecretName = "sub2api-codex-pool-api-key"; const defaultPoolApiKeySecretKey = "API_KEY"; const defaultMinOwnerBalanceUsd = 1000; const defaultAccountPriority = 10; +const remoteJobDir = "/tmp/unidesk-platform-infra-sub2api-codex-pool"; +const remoteJobTimeoutMs = 15 * 60_000; +const remoteJobPollMs = 5_000; interface DisclosureOptions { full: boolean; @@ -296,7 +299,7 @@ async function codexPoolSync(config: UniDeskConfig, options: SyncOptions): Promi tempUnschedulableCredentials: renderSub2ApiTempUnschedulableCredentials(profile.tempUnschedulable), })), }; - const result = await capture(config, g14K3sRoute, ["script"], syncScript(payload, pool)); + const result = await runRemoteCodexPoolScript(config, "sync", syncScript(payload, pool)); const parsed = parseJsonOutput(result.stdout); if (options.raw) { return { @@ -322,7 +325,7 @@ async function codexPoolSync(config: UniDeskConfig, options: SyncOptions): Promi async function codexPoolValidate(config: UniDeskConfig, options: DisclosureOptions): Promise> { const pool = readCodexPoolConfig(); - const result = await capture(config, g14K3sRoute, ["script"], validateScript(pool)); + const result = await runRemoteCodexPoolScript(config, "validate", validateScript(pool)); const parsed = parseJsonOutput(result.stdout); if (options.raw) { return { @@ -3287,6 +3290,191 @@ async function capture(config: UniDeskConfig, target: string, args: string[], in return await runSshCommandCapture(config, target, args, input); } +async function runRemoteCodexPoolScript(config: UniDeskConfig, mode: "sync" | "validate", script: string): Promise { + const jobName = `codex-pool-${mode}-${Date.now().toString(36)}`.slice(0, 63); + const startedAtMs = Date.now(); + const start = await capture(config, g14K3sRoute, ["script"], remoteJobStartScript(jobName, script)); + const started = parseJsonOutput(start.stdout); + if (start.exitCode !== 0 || boolField(started, "ok", false) !== true) return start; + + let latest: RemoteCodexPoolJobStatus | null = null; + while (Date.now() - startedAtMs <= remoteJobTimeoutMs) { + await sleep(remoteJobPollMs); + const probe = await capture(config, g14K3sRoute, ["script"], remoteJobStatusScript(jobName)); + const parsed = parseJsonOutput(probe.stdout); + latest = normalizeRemoteJobStatus(parsed); + process.stderr.write(`${JSON.stringify({ + event: "platform-infra.sub2api.codex-pool.progress", + at: new Date().toISOString(), + mode, + jobName, + status: latest?.status ?? "unknown", + exitCode: latest?.exitCode ?? null, + elapsedMs: Date.now() - startedAtMs, + stdoutBytes: latest?.stdoutBytes ?? null, + stderrBytes: latest?.stderrBytes ?? null, + })}\n`); + if (probe.exitCode !== 0) { + return { + exitCode: probe.exitCode, + stdout: probe.stdout, + stderr: `${start.stderr}${probe.stderr}`, + }; + } + if (latest?.status === "succeeded" || latest?.status === "failed") { + return { + exitCode: latest.exitCode ?? (latest.status === "succeeded" ? 0 : 1), + stdout: latest.stdout ?? "", + stderr: latest.stderr ?? "", + }; + } + } + + return { + exitCode: 124, + stdout: latest?.stdout ?? "", + stderr: [ + latest?.stderr ?? "", + `remote codex-pool ${mode} job ${jobName} did not finish within ${remoteJobTimeoutMs}ms`, + `status command: bun scripts/cli.ts platform-infra sub2api codex-pool ${mode === "sync" ? "sync --confirm" : "validate"}`, + ].filter(Boolean).join("\n"), + }; +} + +interface RemoteCodexPoolJobStatus { + status: "running" | "succeeded" | "failed" | "unknown"; + exitCode: number | null; + stdout: string | null; + stderr: string | null; + stdoutBytes: number | null; + stderrBytes: number | null; +} + +function normalizeRemoteJobStatus(parsed: Record | null): RemoteCodexPoolJobStatus | null { + if (parsed === null) return null; + const status = parsed.status === "running" || parsed.status === "succeeded" || parsed.status === "failed" || parsed.status === "unknown" + ? parsed.status + : "unknown"; + return { + status, + exitCode: typeof parsed.exitCode === "number" ? parsed.exitCode : null, + stdout: typeof parsed.stdout === "string" ? parsed.stdout : null, + stderr: typeof parsed.stderr === "string" ? parsed.stderr : null, + stdoutBytes: typeof parsed.stdoutBytes === "number" ? parsed.stdoutBytes : null, + stderrBytes: typeof parsed.stderrBytes === "number" ? parsed.stderrBytes : null, + }; +} + +function remoteJobStartScript(jobName: string, script: string): string { + const scriptB64 = Buffer.from(script, "utf8").toString("base64"); + return ` +set -eu +job=${shQuote(jobName)} +dir=${shQuote(remoteJobDir)} +mkdir -p "$dir" +chmod 700 "$dir" +script_path="$dir/$job.sh" +stdout_path="$dir/$job.stdout" +stderr_path="$dir/$job.stderr" +exit_path="$dir/$job.exit" +done_path="$dir/$job.done" +pid_path="$dir/$job.pid" +rm -f "$script_path" "$stdout_path" "$stderr_path" "$exit_path" "$done_path" "$pid_path" +base64 -d > "$script_path" <<'UNIDESK_REMOTE_SCRIPT_B64' +${scriptB64} +UNIDESK_REMOTE_SCRIPT_B64 +chmod 700 "$script_path" +( + set +e + trap '' HUP + sh "$script_path" > "$stdout_path" 2> "$stderr_path" + code=$? + printf '%s' "$code" > "$exit_path" + date -u +%Y-%m-%dT%H:%M:%SZ > "$done_path" + rm -f "$script_path" +) >/dev/null 2>&1 & +pid=$! +printf '%s' "$pid" > "$pid_path" +python3 - < limit: + data = data[-limit:] + return data.decode("utf-8", errors="replace") + +def file_size(path): + try: + return os.path.getsize(path) + except FileNotFoundError: + return 0 + +pid_text = read_text(pid_path) +done = os.path.exists(done_path) +exit_text = read_text(exit_path) +exit_code = None +if exit_text is not None: + try: + exit_code = int(exit_text.strip()) + except ValueError: + exit_code = None +running = False +if pid_text is not None and not done: + try: + os.kill(int(pid_text.strip()), 0) + running = True + except Exception: + running = False +status = "succeeded" if done and exit_code == 0 else "failed" if done else "running" if running else "unknown" +print(json.dumps({ + "ok": True, + "jobName": job, + "status": status, + "pid": int(pid_text.strip()) if pid_text and pid_text.strip().isdigit() else None, + "exitCode": exit_code, + "stdoutBytes": file_size(stdout_path), + "stderrBytes": file_size(stderr_path), + "stdout": read_text(stdout_path, 500000) if done else read_text(stdout_path, 12000), + "stderr": read_text(stderr_path, 120000) if done else read_text(stderr_path, 12000), + "doneAt": read_text(done_path), +}, ensure_ascii=False)) +PY +`; +} + +function shQuote(value: string): string { + return `'${value.replace(/'/gu, `'\\''`)}'`; +} + +function sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + function parseJsonOutput(stdout: string): Record | null { const trimmed = stdout.trim(); if (trimmed.length === 0) return null;