fix: 收敛 queue runner 失败终态与 CLI 生命周期

This commit is contained in:
Codex
2026-06-01 23:33:08 +08:00
parent b19143ad85
commit 5104d402c7
6 changed files with 196 additions and 9 deletions
+164 -4
View File
@@ -1,4 +1,7 @@
import { readFile } from "node:fs/promises";
import { mkdir, readFile, rm, writeFile } from "node:fs/promises";
import { spawn } from "node:child_process";
import { closeSync, existsSync, openSync } from "node:fs";
import path from "node:path";
import { startManagerServer } from "../../src/mgr/server.js";
import { MemoryAgentRunStore } from "../../src/mgr/store.js";
import { ManagerClient } from "../../src/mgr/client.js";
@@ -30,7 +33,9 @@ async function dispatch(args: ParsedArgs): Promise<JsonValue> {
const [group, command, id] = args.positional;
if (!group || group === "help") return help();
if (group === "server" && command === "start") return startServer(args);
if (group === "server" && command === "status") return client(args).get("/health/readiness");
if (group === "server" && command === "status") return serverStatus(args);
if (group === "server" && command === "logs") return serverLogs(args);
if (group === "server" && command === "stop") return stopServer(args);
if (group === "backends" && command === "list") return client(args).get("/api/v1/backends");
if (group === "secrets" && command === "codex" && id === "render") return renderCodexSecret(args);
if (group === "queue" && command === "submit") return submitQueueTask(args);
@@ -223,12 +228,91 @@ async function renderCodexSecret(args: ParsedArgs): Promise<JsonRecord> {
}
async function startServer(args: ParsedArgs): Promise<JsonRecord> {
if (args.flags.get("foreground") === true) return startServerForeground(args);
const port = Number(flag(args, "port", "8080"));
const host = flag(args, "host", "0.0.0.0");
const state = await readServerState(port);
if (state.pidAlive || state.portListening) {
throw new AgentRunError("infra-failed", `agentrun-mgr already appears to be running on port ${port}; use server status or server stop first`, { httpStatus: 409, details: state as JsonRecord });
}
await ensureDir(stateDir());
await ensureDir(logDir());
const logPath = serverLogPath(port);
const argsForChild = [process.argv[1] ?? "scripts/agentrun-cli.ts", "server", "start", "--foreground", "--host", host, "--port", String(port)];
const store = optionalFlag(args, "store");
if (store) argsForChild.push("--store", store);
const stdoutFd = openSync(logPath, "a");
const stderrFd = openSync(logPath, "a");
const child = spawn(process.execPath, argsForChild, {
cwd: process.cwd(),
env: process.env,
detached: true,
stdio: ["ignore", stdoutFd, stderrFd],
});
closeSync(stdoutFd);
closeSync(stderrFd);
child.unref();
const pidFile = pidFilePath(port);
await writeFile(pidFile, JSON.stringify({ pid: child.pid, port, host, logPath, startedAt: new Date().toISOString() }) + "\n", "utf8");
const localBaseUrl = `http://${host === "0.0.0.0" ? "127.0.0.1" : host}:${port}`;
return { action: "server-start", mode: "background", serviceId: "agentrun-mgr", pid: child.pid ?? null, port, host, baseUrl: localBaseUrl, pidFile, logPath, pollCommands: { status: `./scripts/agentrun server status --port ${port}`, logs: `./scripts/agentrun server logs --port ${port}`, stop: `./scripts/agentrun server stop --port ${port}` } };
}
async function startServerForeground(args: ParsedArgs): Promise<JsonRecord> {
const port = Number(flag(args, "port", "8080"));
const host = flag(args, "host", "0.0.0.0");
const storeMode = optionalFlag(args, "store") ?? process.env.AGENTRUN_STORE ?? process.env.AGENTRUN_MGR_STORE;
const started = await startManagerServer({ port, host, ...(storeMode === "memory" ? { store: new MemoryAgentRunStore() } : {}) });
const database = await started.store.health();
return { serviceId: "agentrun-mgr", baseUrl: started.baseUrl, pid: process.pid, database, note: "foreground process; use Kubernetes/Tekton for v0.1 runtime" };
return { serviceId: "agentrun-mgr", baseUrl: started.baseUrl, pid: process.pid, database, mode: "foreground", note: "foreground process; use server start without --foreground for local background mode" };
}
async function serverStatus(args: ParsedArgs): Promise<JsonRecord> {
const explicitPort = optionalFlag(args, "port");
const baseUrl = explicitPort ? `http://127.0.0.1:${Number(explicitPort)}` : managerUrl(args);
const port = Number(explicitPort ?? portFromUrl(baseUrl));
const state = await readServerState(port);
let readiness: JsonValue = null;
let readinessFailure: JsonRecord | null = null;
if (explicitPort ? state.portListening : true) {
try {
readiness = await new ManagerClient(baseUrl).get("/health/readiness");
} catch (error) {
readinessFailure = errorToJson(error);
}
}
return { action: "server-status", serviceId: "agentrun-mgr", port, baseUrl, local: state as JsonRecord, readiness, readinessFailure, pollCommands: { status: `./scripts/agentrun server status --port ${port}`, logs: `./scripts/agentrun server logs --port ${port}`, stop: `./scripts/agentrun server stop --port ${port}` } };
}
async function serverLogs(args: ParsedArgs): Promise<JsonRecord> {
const port = Number(flag(args, "port", portFromManagerUrl(args)));
const state = await readServerState(port);
const logPath = optionalFlag(args, "log-file") ?? (typeof state.logPath === "string" ? state.logPath : null);
const tailBytes = Number(flag(args, "tail-bytes", "12000"));
if (!logPath) return { action: "server-logs", serviceId: "agentrun-mgr", port, logPath: null, exists: false, tail: "", bytes: 0, truncated: false, message: "no log file recorded for this port" };
if (!existsSync(logPath)) return { action: "server-logs", serviceId: "agentrun-mgr", port, logPath, exists: false, tail: "", bytes: 0, truncated: false, message: "log file does not exist" };
const bytes = await readFile(logPath);
const start = Math.max(0, bytes.byteLength - tailBytes);
return { action: "server-logs", serviceId: "agentrun-mgr", port, logPath, exists: true, bytes: bytes.byteLength, tailBytes, truncated: start > 0, tail: bytes.subarray(start).toString("utf8") };
}
async function stopServer(args: ParsedArgs): Promise<JsonRecord> {
const port = Number(flag(args, "port", portFromManagerUrl(args)));
const before = await readServerState(port);
let signalSent = false;
const beforePid = typeof before.pid === "number" ? before.pid : null;
const beforePortPid = typeof before.portPid === "number" ? before.portPid : null;
if (before.pidAlive === true && beforePid !== null) {
process.kill(beforePid, "SIGTERM");
signalSent = true;
} else if (beforePortPid !== null) {
process.kill(beforePortPid, "SIGTERM");
signalSent = true;
}
await sleep(500);
const after = await readServerState(port);
if (!after.pidAlive && !after.portListening && existsSync(pidFilePath(port))) await rm(pidFilePath(port), { force: true });
return { action: "server-stop", serviceId: "agentrun-mgr", port, signalSent, before: before as JsonRecord, after: after as JsonRecord, stopped: !after.pidAlive && !after.portListening };
}
function client(args: ParsedArgs): ManagerClient {
@@ -239,6 +323,79 @@ function managerUrl(args: ParsedArgs): string {
return optionalFlag(args, "manager-url") ?? process.env.AGENTRUN_MGR_URL ?? "http://127.0.0.1:8080";
}
function portFromManagerUrl(args: ParsedArgs): string {
return portFromUrl(managerUrl(args));
}
function portFromUrl(value: string): string {
try {
const url = new URL(value);
return url.port || (url.protocol === "https:" ? "443" : "80");
} catch {
return "8080";
}
}
function stateDir(): string {
return path.join(process.cwd(), ".state");
}
function logDir(): string {
return path.join(process.cwd(), "logs", new Date().toISOString().slice(0, 10).replace(/-/gu, ""));
}
function pidFilePath(port: number): string {
return path.join(stateDir(), `agentrun-mgr-${port}.pid.json`);
}
function serverLogPath(port: number): string {
return path.join(logDir(), `agentrun-mgr-${port}-${new Date().toISOString().replace(/[:.]/gu, "-")}.jsonl`);
}
async function ensureDir(dir: string): Promise<void> {
await mkdir(dir, { recursive: true });
}
async function readServerState(port: number): Promise<JsonRecord> {
const pidFile = pidFilePath(port);
const pidRecord = await readPidFile(pidFile);
const pid = typeof pidRecord?.pid === "number" ? pidRecord.pid : null;
const pidAlive = pid !== null && isPidAlive(pid);
const portPid = await pidForPort(port);
return { pidFile, pid, pidAlive, port, portListening: portPid !== null, portPid, logPath: typeof pidRecord?.logPath === "string" ? pidRecord.logPath : null, startedAt: typeof pidRecord?.startedAt === "string" ? pidRecord.startedAt : null };
}
async function readPidFile(pidFile: string): Promise<JsonRecord | null> {
try {
return JSON.parse(await readFile(pidFile, "utf8")) as JsonRecord;
} catch {
return null;
}
}
function isPidAlive(pid: number): boolean {
try {
process.kill(pid, 0);
return true;
} catch {
return false;
}
}
async function pidForPort(port: number): Promise<number | null> {
const proc = spawn("sh", ["-c", `command -v ss >/dev/null 2>&1 && ss -ltnp 'sport = :${port}' || true`], { stdio: ["ignore", "pipe", "ignore"] });
const chunks: Buffer[] = [];
proc.stdout.on("data", (chunk: Buffer) => chunks.push(chunk));
await new Promise<void>((resolve) => proc.on("close", () => resolve()));
const text = Buffer.concat(chunks).toString("utf8");
const match = text.match(/pid=(\d+)/u);
return match ? Number(match[1]) : null;
}
async function sleep(ms: number): Promise<void> {
await new Promise((resolve) => setTimeout(resolve, ms));
}
async function jsonFile(args: ParsedArgs): Promise<JsonRecord> {
const file = optionalFlag(args, "json-file");
if (!file) throw new AgentRunError("schema-invalid", "--json-file is required", { httpStatus: 2 });
@@ -316,7 +473,10 @@ function help(): JsonRecord {
"queue refresh <taskId>",
"secrets codex render --dry-run [--profile codex|deepseek] [--codex-home <dir>] [--namespace agentrun-v01] [--secret-name <name>]",
"backends list",
"server start|status",
"server start [--port <port>] [--host <host>] [--foreground]",
"server status [--port <port>]",
"server logs [--port <port>] [--tail-bytes <bytes>] [--log-file <path>]",
"server stop [--port <port>]",
],
};
}