feat: 补充 runner Job dry-run 骨架
This commit is contained in:
@@ -108,7 +108,7 @@ POST /api/v1/commands/:commandId/ack
|
||||
| 规格项 | 状态 | 说明 |
|
||||
| --- | --- | --- |
|
||||
| `agentrun-mgr` 服务规格 | 已定义 | 本文为 v0.1 manager 权威。 |
|
||||
| Manager REST API | 未实现 | 需要后续代码实现。 |
|
||||
| Manager REST API | 部分实现 | 内存 self-test server 已覆盖公共 run/command/event/backends 和 runner register/claim/lease/poll/ack/event/status 骨架;生产 Postgres durable adapter 尚未接入。 |
|
||||
| Tenant policy boundary | 已定义/未实现 | v0.1 只做最小 schema/allowlist/secretScope 边界。 |
|
||||
| Postgres durable adapter | 未实现 | 见 [spec-v01-postgres.md](spec-v01-postgres.md)。 |
|
||||
| Observability 最小合同 | 已定义/未实现 | event、terminal status、failureKind 和 redaction 需要代码实现。 |
|
||||
| Observability 最小合同 | 部分实现 | 内存路径已输出非空 JSON、runner claim/lease/backend/terminal events、failureKind 和 Secret redaction 自测试;持久化和集群日志联调待补齐。 |
|
||||
|
||||
@@ -99,7 +99,7 @@ Runner 日志必须实时 flush 到文件或 pod log,CLI 启动 runner 时必
|
||||
| 规格项 | 状态 | 说明 |
|
||||
| --- | --- | --- |
|
||||
| `agentrun-runner` 服务规格 | 已定义 | 本文为 v0.1 runner 权威。 |
|
||||
| Kubernetes Job runner | 未实现 | 需要后续 runtime/GitOps 实现。 |
|
||||
| host process runner | 未实现 | 可作为 MVP 手动 dispatch,但仍必须走 manager API。 |
|
||||
| claim/lease/report client | 未实现 | 需要后续代码实现。 |
|
||||
| Kubernetes Job runner | 部分实现 | 已提供 `runner job --dry-run` Job manifest 渲染骨架,固定使用 `agentrun-v01-runner` ServiceAccount、manager URL、runId/commandId/attemptId、executionPolicy 和 SecretRef 文件投影;尚未执行真实 Kubernetes create/apply。 |
|
||||
| host process runner | 部分实现 | `runner start` 和 `src/runner/main.ts` 进入同一套 `runOnce`,可通过 manager register/claim/poll/report 执行自测试。 |
|
||||
| claim/lease/report client | 部分实现 | 已拆出 runner manager API client,覆盖 register、claim、lease heartbeat、poll command、ack、append event 和 terminal status;durable store 仍待 Postgres adapter 接入。 |
|
||||
| runner redaction | 已定义/未实现 | 需与 backend adapter 共同实现。 |
|
||||
|
||||
@@ -72,7 +72,7 @@ bun scripts/agentrun-cli.ts server start|status|stop|logs
|
||||
| 规格项 | 状态 | 说明 |
|
||||
| --- | --- | --- |
|
||||
| AgentRun CLI 规格 | 已定义 | 本文为 v0.1 CLI 权威。 |
|
||||
| `scripts/agentrun-cli.ts` | 未实现 | 需要后续代码实现。 |
|
||||
| CLI 调 manager REST | 未实现 | 需随 manager API 实现。 |
|
||||
| runner start | 未实现 | 需随 runner Job/host process 实现。 |
|
||||
| `scripts/agentrun-cli.ts` | 部分实现 | 已提供 run/command/event/backend/server 基础命令和 JSON envelope。 |
|
||||
| CLI 调 manager REST | 部分实现 | CLI 通过 `ManagerClient` 调 manager REST;当前自测试使用内存 manager。 |
|
||||
| runner start | 部分实现 | `runner start` 可执行 host process runner;`runner job --dry-run` 可渲染 Kubernetes Job JSON,尚不执行 create/apply。 |
|
||||
| CLI 测试规格 | 已定义 | 综合联调见 [spec-v01-validation.md](spec-v01-validation.md)。 |
|
||||
|
||||
@@ -132,7 +132,7 @@ Secret 创建和轮换不由 source branch 自动生成;source branch 只声
|
||||
| 规格项 | 状态 | 说明 |
|
||||
| --- | --- | --- |
|
||||
| Secret 分发规格 | 已定义 | 本文为 v0.1 provider credential 分发权威。 |
|
||||
| Kubernetes SecretRef 注入 | 未实现 | 需要后续 GitOps/runtime 实现。 |
|
||||
| Codex auth/config file projection | 未实现 | 需要后续 runner/backend adapter 实现,测试来源为 `~/.codex/auth.json` 和 `~/.codex/config.toml`。 |
|
||||
| redaction 最小规则 | 已定义 | 需要后续代码实现和测试。 |
|
||||
| Kubernetes SecretRef 注入 | 部分实现 | runner Job dry-run 渲染已按 run `executionPolicy.secretScope.providerCredentials` 生成 Secret volume projection 和 `CODEX_HOME`,但尚未 apply 到集群。 |
|
||||
| Codex auth/config file projection | 部分实现 | backend readiness 检查 `auth.json`/`config.toml` 可读性,缺失时返回 `secret-unavailable`;self-test 使用临时文件模拟投影。 |
|
||||
| redaction 最小规则 | 部分实现 | event、Job dry-run 输出和 self-test 已验证不打印测试 token;复杂审计仍待后续补齐。 |
|
||||
| 外部 secret manager | 未采用 | 如需 Vault/ExternalSecrets/SOPS,后续单独更新规格。 |
|
||||
|
||||
@@ -154,7 +154,7 @@ Manager 负责校验、保存和返回这些字段;runner 只能消费已保
|
||||
| Codex backend 规格 | 已定义 | 见 [spec-v01-backend-codex.md](spec-v01-backend-codex.md)。 |
|
||||
| AgentRun CLI 规格 | 已定义 | 见 [spec-v01-cli.md](spec-v01-cli.md)。 |
|
||||
| Scheduler deferred 规格 | 已定义 | 见 [spec-v01-scheduler.md](spec-v01-scheduler.md)。 |
|
||||
| `agentrun-mgr` 实现 | 未实现 | 需后续代码实现。 |
|
||||
| `agentrun-runner` 实现 | 未实现 | 需后续代码实现。 |
|
||||
| `agentrun-mgr` 实现 | 部分实现 | 已有内存 REST 骨架覆盖 run/command/event 与 runner claim/lease/report;Postgres durable store 仍待接入。 |
|
||||
| `agentrun-runner` 实现 | 部分实现 | 已有 host process runner 与 Kubernetes Job dry-run 渲染骨架,runner 通过 manager API claim/poll/report,不直连 Postgres。 |
|
||||
| 第一真实 backend | 未实现 | 默认候选 Codex。 |
|
||||
| 自动 scheduler | Deferred | 不作为 `v0.1` 第一阶段验收目标。 |
|
||||
|
||||
+31
-1
@@ -2,7 +2,8 @@ import { readFile } from "node:fs/promises";
|
||||
import { startManagerServer } from "../../src/mgr/server.js";
|
||||
import { ManagerClient } from "../../src/mgr/client.js";
|
||||
import { runOnce } from "../../src/runner/run-once.js";
|
||||
import type { JsonRecord, JsonValue } from "../../src/common/types.js";
|
||||
import { renderRunnerJobDryRun } from "../../src/runner/k8s-job.js";
|
||||
import type { JsonRecord, JsonValue, RunRecord } from "../../src/common/types.js";
|
||||
import { AgentRunError, errorToJson } from "../../src/common/errors.js";
|
||||
import type { RunnerOnceOptions } from "../../src/runner/run-once.js";
|
||||
|
||||
@@ -58,9 +59,37 @@ async function dispatch(args: ParsedArgs): Promise<JsonValue> {
|
||||
if (codexHome) options.codexHome = codexHome;
|
||||
return runOnce(options) as unknown as JsonValue;
|
||||
}
|
||||
if (group === "runner" && command === "job") return renderRunnerJob(args);
|
||||
throw new AgentRunError("schema-invalid", `unsupported command: ${args.positional.join(" ")}`, { httpStatus: 2 });
|
||||
}
|
||||
|
||||
async function renderRunnerJob(args: ParsedArgs): Promise<JsonRecord> {
|
||||
if (args.flags.get("dry-run") !== true) throw new AgentRunError("schema-invalid", "runner job only supports --dry-run in v0.1", { httpStatus: 2 });
|
||||
const runId = flag(args, "run-id", "");
|
||||
const commandId = flag(args, "command-id", "");
|
||||
const image = flag(args, "image", "");
|
||||
if (!runId) throw new AgentRunError("schema-invalid", "runner job requires --run-id", { httpStatus: 2 });
|
||||
if (!commandId) throw new AgentRunError("schema-invalid", "runner job requires --command-id", { httpStatus: 2 });
|
||||
if (!image) throw new AgentRunError("schema-invalid", "runner job requires --image", { httpStatus: 2 });
|
||||
const run = await client(args).get(`/api/v1/runs/${encodeURIComponent(runId)}`) as RunRecord;
|
||||
const options = {
|
||||
run,
|
||||
commandId,
|
||||
image,
|
||||
managerUrl: managerUrl(args),
|
||||
namespace: optionalFlag(args, "namespace") ?? "agentrun-v01",
|
||||
};
|
||||
const attemptId = optionalFlag(args, "attempt-id");
|
||||
const runnerId = optionalFlag(args, "runner-id");
|
||||
const sourceCommit = optionalFlag(args, "source-commit");
|
||||
return renderRunnerJobDryRun({
|
||||
...options,
|
||||
...(attemptId ? { attemptId } : {}),
|
||||
...(runnerId ? { runnerId } : {}),
|
||||
...(sourceCommit ? { sourceCommit } : {}),
|
||||
});
|
||||
}
|
||||
|
||||
async function startServer(args: ParsedArgs): Promise<JsonRecord> {
|
||||
const port = Number(flag(args, "port", "8080"));
|
||||
const host = flag(args, "host", "0.0.0.0");
|
||||
@@ -123,6 +152,7 @@ function help(): JsonRecord {
|
||||
"commands create <runId> --type turn --json-file <payload.json>",
|
||||
"commands show <commandId> --run-id <runId>",
|
||||
"runner start --run-id <runId>",
|
||||
"runner job --dry-run --run-id <runId> --command-id <commandId> --image <image>",
|
||||
"backends list",
|
||||
"server start|status",
|
||||
],
|
||||
|
||||
@@ -77,6 +77,13 @@ async function route({ method, url, body, store, sourceCommit }: { method: strin
|
||||
if (runnerId.length === 0) throw new AgentRunError("schema-invalid", "runnerId is required", { httpStatus: 400 });
|
||||
return store.claimRun(claimMatch[1] ?? "", runnerId, numberField(record, "leaseMs", 60_000)) as unknown as JsonValue;
|
||||
}
|
||||
const leaseMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/lease$/u);
|
||||
if (method === "PATCH" && leaseMatch) {
|
||||
const record = asRecord(body, "lease");
|
||||
const runnerId = typeof record.runnerId === "string" ? record.runnerId : "";
|
||||
if (runnerId.length === 0) throw new AgentRunError("schema-invalid", "runnerId is required", { httpStatus: 400 });
|
||||
return store.heartbeat(leaseMatch[1] ?? "", runnerId, numberField(record, "leaseMs", 60_000)) as unknown as JsonValue;
|
||||
}
|
||||
const eventsAppendMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/events$/u);
|
||||
if (method === "POST" && eventsAppendMatch) {
|
||||
const record = asRecord(body, "event");
|
||||
|
||||
@@ -0,0 +1,191 @@
|
||||
import { stableHash } from "../common/validation.js";
|
||||
import type { BackendProfile, ExecutionPolicy, JsonRecord, JsonValue, RunRecord, SecretRef } from "../common/types.js";
|
||||
|
||||
export interface RunnerJobRenderOptions {
|
||||
run: RunRecord;
|
||||
commandId: string;
|
||||
managerUrl: string;
|
||||
image: string;
|
||||
namespace?: string;
|
||||
attemptId?: string;
|
||||
runnerId?: string;
|
||||
sourceCommit?: string;
|
||||
serviceAccountName?: string;
|
||||
imagePullPolicy?: string;
|
||||
backoffLimit?: number;
|
||||
ttlSecondsAfterFinished?: number;
|
||||
}
|
||||
|
||||
interface CredentialProjection {
|
||||
profile: BackendProfile | string;
|
||||
secretRef: SecretRef;
|
||||
volumeName: string;
|
||||
mountPath: string;
|
||||
}
|
||||
|
||||
export function renderRunnerJobDryRun(options: RunnerJobRenderOptions): JsonRecord {
|
||||
const render = renderRunnerJobManifest(options);
|
||||
return {
|
||||
dryRun: true,
|
||||
mutation: false,
|
||||
action: "render-kubernetes-job",
|
||||
jobIdentity: {
|
||||
kind: "Job",
|
||||
namespace: render.namespace,
|
||||
name: render.jobName,
|
||||
serviceAccountName: render.serviceAccountName,
|
||||
},
|
||||
runner: {
|
||||
runId: options.run.id,
|
||||
commandId: options.commandId,
|
||||
attemptId: render.attemptId,
|
||||
runnerId: render.runnerId,
|
||||
backendProfile: options.run.backendProfile,
|
||||
managerUrl: options.managerUrl,
|
||||
sourceCommit: render.sourceCommit,
|
||||
},
|
||||
secretRefs: render.secretRefs.map((item) => ({ profile: item.profile, name: item.secretRef.name, namespace: item.secretRef.namespace ?? render.namespace, keys: item.secretRef.keys ?? [], mountPath: item.mountPath, valuesPrinted: false })),
|
||||
pollCommands: {
|
||||
run: `bun scripts/agentrun-cli.ts runs show ${options.run.id} --manager-url ${options.managerUrl}`,
|
||||
events: `bun scripts/agentrun-cli.ts runs events ${options.run.id} --manager-url ${options.managerUrl} --after-seq 0 --limit 100`,
|
||||
},
|
||||
warnings: render.warnings,
|
||||
manifest: render.manifest,
|
||||
};
|
||||
}
|
||||
|
||||
export function renderRunnerJobManifest(options: RunnerJobRenderOptions): { manifest: JsonRecord; namespace: string; jobName: string; runnerId: string; attemptId: string; sourceCommit: string; serviceAccountName: string; secretRefs: CredentialProjection[]; warnings: string[] } {
|
||||
const namespace = options.namespace ?? "agentrun-v01";
|
||||
const attemptId = options.attemptId ?? `attempt_${Date.now().toString(36)}`;
|
||||
const runnerId = options.runnerId ?? `runner_${shortHash(`${options.run.id}:${attemptId}:${options.commandId}`)}`;
|
||||
const sourceCommit = options.sourceCommit ?? process.env.AGENTRUN_SOURCE_COMMIT ?? "unknown";
|
||||
const serviceAccountName = options.serviceAccountName ?? "agentrun-v01-runner";
|
||||
const jobName = `agentrun-v01-runner-${shortDnsHash(options.run.id, attemptId)}`;
|
||||
const secretRefs = credentialProjections(options.run, namespace);
|
||||
const warnings: string[] = [];
|
||||
if (secretRefs.length === 0) warnings.push("run executionPolicy.secretScope 未声明 provider SecretRef;runner 将按 secret-unavailable 上报,而不会降级直连外部凭据");
|
||||
const env = runnerEnv(options, { namespace, jobName, runnerId, attemptId, sourceCommit, secretRefs });
|
||||
const manifest: JsonRecord = {
|
||||
apiVersion: "batch/v1",
|
||||
kind: "Job",
|
||||
metadata: {
|
||||
name: jobName,
|
||||
namespace,
|
||||
labels: labels(options.run, jobName),
|
||||
annotations: {
|
||||
"agentrun.pikastech.local/run-id": options.run.id,
|
||||
"agentrun.pikastech.local/command-id": options.commandId,
|
||||
"agentrun.pikastech.local/dry-run-render": "true",
|
||||
},
|
||||
},
|
||||
spec: {
|
||||
backoffLimit: options.backoffLimit ?? 0,
|
||||
ttlSecondsAfterFinished: options.ttlSecondsAfterFinished ?? 86_400,
|
||||
template: {
|
||||
metadata: {
|
||||
labels: labels(options.run, jobName),
|
||||
annotations: {
|
||||
"agentrun.pikastech.local/run-id": options.run.id,
|
||||
"agentrun.pikastech.local/command-id": options.commandId,
|
||||
},
|
||||
},
|
||||
spec: {
|
||||
serviceAccountName,
|
||||
automountServiceAccountToken: false,
|
||||
restartPolicy: "Never",
|
||||
containers: [
|
||||
{
|
||||
name: "runner",
|
||||
image: options.image,
|
||||
imagePullPolicy: options.imagePullPolicy ?? "IfNotPresent",
|
||||
command: ["bun", "src/runner/main.ts"],
|
||||
env,
|
||||
volumeMounts: secretRefs.map((item) => ({ name: item.volumeName, mountPath: item.mountPath, readOnly: true })),
|
||||
resources: {
|
||||
requests: { cpu: "250m", memory: "512Mi" },
|
||||
limits: { cpu: "2", memory: "4Gi" },
|
||||
},
|
||||
securityContext: {
|
||||
allowPrivilegeEscalation: false,
|
||||
readOnlyRootFilesystem: false,
|
||||
capabilities: { drop: ["ALL"] },
|
||||
},
|
||||
},
|
||||
],
|
||||
volumes: secretRefs.map(secretVolume),
|
||||
},
|
||||
},
|
||||
},
|
||||
};
|
||||
return { manifest, namespace, jobName, runnerId, attemptId, sourceCommit, serviceAccountName, secretRefs, warnings };
|
||||
}
|
||||
|
||||
function runnerEnv(options: RunnerJobRenderOptions, context: { namespace: string; jobName: string; runnerId: string; attemptId: string; sourceCommit: string; secretRefs: CredentialProjection[] }): JsonRecord[] {
|
||||
const codexMount = context.secretRefs.find((item) => item.profile === "codex")?.mountPath ?? "/home/agentrun/.codex";
|
||||
return [
|
||||
{ name: "AGENTRUN_MGR_URL", value: options.managerUrl },
|
||||
{ name: "AGENTRUN_RUN_ID", value: options.run.id },
|
||||
{ name: "AGENTRUN_COMMAND_ID", value: options.commandId },
|
||||
{ name: "AGENTRUN_ATTEMPT_ID", value: context.attemptId },
|
||||
{ name: "AGENTRUN_RUNNER_ID", value: context.runnerId },
|
||||
{ name: "AGENTRUN_BACKEND_PROFILE", value: options.run.backendProfile },
|
||||
{ name: "AGENTRUN_EXECUTION_POLICY_JSON", value: JSON.stringify(options.run.executionPolicy) },
|
||||
{ name: "AGENTRUN_SOURCE_COMMIT", value: context.sourceCommit },
|
||||
{ name: "AGENTRUN_RUNTIME_NAMESPACE", value: context.namespace },
|
||||
{ name: "AGENTRUN_K8S_JOB_NAME", value: context.jobName },
|
||||
{ name: "AGENTRUN_LOG_PATH", value: "/tmp/agentrun-runner.jsonl" },
|
||||
{ name: "HOME", value: "/home/agentrun" },
|
||||
{ name: "CODEX_HOME", value: codexMount },
|
||||
];
|
||||
}
|
||||
|
||||
function credentialProjections(run: RunRecord, namespace: string): CredentialProjection[] {
|
||||
const policy: ExecutionPolicy = run.executionPolicy;
|
||||
const credentials = policy.secretScope.providerCredentials ?? [];
|
||||
return credentials.map((item, index) => ({
|
||||
profile: item.profile,
|
||||
secretRef: item.secretRef.namespace ? item.secretRef : { ...item.secretRef, namespace },
|
||||
volumeName: sanitizeVolumeName(`${String(item.profile)}-${index}`),
|
||||
mountPath: normalizeMountPath(item.secretRef.mountPath),
|
||||
}));
|
||||
}
|
||||
|
||||
function secretVolume(item: CredentialProjection): JsonRecord {
|
||||
const secret: JsonRecord = {
|
||||
secretName: item.secretRef.name,
|
||||
defaultMode: 256,
|
||||
};
|
||||
const keys = item.secretRef.keys ?? [];
|
||||
if (keys.length > 0) secret.items = keys.map((key) => ({ key, path: key, mode: 256 }));
|
||||
return { name: item.volumeName, secret };
|
||||
}
|
||||
|
||||
function normalizeMountPath(value: string | undefined): string {
|
||||
if (!value || value === "~/.codex") return "/home/agentrun/.codex";
|
||||
if (value.startsWith("~/")) return `/home/agentrun/${value.slice(2)}`;
|
||||
return value;
|
||||
}
|
||||
|
||||
function labels(run: RunRecord, jobName: string): JsonRecord {
|
||||
return {
|
||||
"app.kubernetes.io/name": "agentrun-runner",
|
||||
"app.kubernetes.io/component": "runner",
|
||||
"app.kubernetes.io/part-of": "agentrun",
|
||||
"agentrun.pikastech.local/lane": "v0.1",
|
||||
"agentrun.pikastech.local/run-hash": shortHash(run.id),
|
||||
"job-name": jobName,
|
||||
};
|
||||
}
|
||||
|
||||
function shortDnsHash(...parts: string[]): string {
|
||||
return shortHash(parts.join(":"));
|
||||
}
|
||||
|
||||
function shortHash(value: JsonValue): string {
|
||||
return stableHash(value).slice(0, 12);
|
||||
}
|
||||
|
||||
function sanitizeVolumeName(value: string): string {
|
||||
const sanitized = value.toLowerCase().replace(/[^a-z0-9-]+/gu, "-").replace(/^-+|-+$/gu, "");
|
||||
return sanitized.length > 0 ? sanitized.slice(0, 40) : "credential";
|
||||
}
|
||||
+18
-2
@@ -1,4 +1,6 @@
|
||||
import { runOnce, type RunnerOnceOptions } from "./run-once.js";
|
||||
import { AgentRunError, errorToJson } from "../common/errors.js";
|
||||
import { failureKindFromError } from "./manager-api.js";
|
||||
|
||||
const managerUrl = process.env.AGENTRUN_MGR_URL;
|
||||
const runId = process.env.AGENTRUN_RUN_ID;
|
||||
@@ -11,9 +13,23 @@ const options: RunnerOnceOptions = {
|
||||
managerUrl,
|
||||
runId,
|
||||
};
|
||||
if (process.env.AGENTRUN_COMMAND_ID) options.commandId = process.env.AGENTRUN_COMMAND_ID;
|
||||
if (process.env.AGENTRUN_ATTEMPT_ID) options.attemptId = process.env.AGENTRUN_ATTEMPT_ID;
|
||||
if (process.env.AGENTRUN_RUNNER_ID) options.runnerId = process.env.AGENTRUN_RUNNER_ID;
|
||||
if (process.env.AGENTRUN_BACKEND_PROFILE === "codex") options.backendProfile = "codex";
|
||||
if (process.env.AGENTRUN_K8S_JOB_NAME) options.placement = "kubernetes-job";
|
||||
if (process.env.AGENTRUN_SOURCE_COMMIT) options.sourceCommit = process.env.AGENTRUN_SOURCE_COMMIT;
|
||||
if (process.env.AGENTRUN_K8S_JOB_NAME) options.jobName = process.env.AGENTRUN_K8S_JOB_NAME;
|
||||
if (process.env.HOSTNAME) options.podName = process.env.HOSTNAME;
|
||||
if (process.env.AGENTRUN_LOG_PATH) options.logPath = process.env.AGENTRUN_LOG_PATH;
|
||||
if (process.env.AGENTRUN_CODEX_COMMAND) options.codexCommand = process.env.AGENTRUN_CODEX_COMMAND;
|
||||
if (process.env.AGENTRUN_CODEX_ARGS) options.codexArgs = JSON.parse(process.env.AGENTRUN_CODEX_ARGS) as string[];
|
||||
if (process.env.CODEX_HOME) options.codexHome = process.env.CODEX_HOME;
|
||||
const result = await runOnce(options);
|
||||
console.log(JSON.stringify({ ok: true, data: result }));
|
||||
try {
|
||||
const result = await runOnce(options);
|
||||
console.log(JSON.stringify({ ok: true, data: result }));
|
||||
} catch (error) {
|
||||
const failureKind = failureKindFromError(error);
|
||||
console.log(JSON.stringify({ ok: false, failureKind, message: error instanceof Error ? error.message : String(error), error: errorToJson(error) }));
|
||||
process.exit(error instanceof AgentRunError && error.httpStatus >= 1 && error.httpStatus <= 255 ? error.httpStatus : 1);
|
||||
}
|
||||
|
||||
@@ -0,0 +1,108 @@
|
||||
import { ManagerClient } from "../mgr/client.js";
|
||||
import { AgentRunError } from "../common/errors.js";
|
||||
import type { BackendEvent, BackendProfile, CommandRecord, FailureKind, JsonRecord, RunRecord, RunnerRecord, TerminalStatus } from "../common/types.js";
|
||||
|
||||
export interface RunnerRegistrationInput {
|
||||
runId: string;
|
||||
attemptId: string;
|
||||
backendProfile: BackendProfile;
|
||||
placement: "host-process" | "kubernetes-job";
|
||||
sourceCommit: string;
|
||||
runnerId?: string;
|
||||
jobName?: string;
|
||||
podName?: string;
|
||||
logPath?: string;
|
||||
}
|
||||
|
||||
export interface PollCommandsResult {
|
||||
items: CommandRecord[];
|
||||
selected: CommandRecord | null;
|
||||
}
|
||||
|
||||
export interface RunnerFailureReport {
|
||||
terminalStatus: TerminalStatus;
|
||||
failureKind: FailureKind;
|
||||
failureMessage: string;
|
||||
}
|
||||
|
||||
export class RunnerManagerApi {
|
||||
readonly client: ManagerClient;
|
||||
|
||||
constructor(readonly managerUrl: string) {
|
||||
this.client = new ManagerClient(managerUrl);
|
||||
}
|
||||
|
||||
async register(input: RunnerRegistrationInput): Promise<RunnerRecord> {
|
||||
const body: JsonRecord = {
|
||||
runId: input.runId,
|
||||
attemptId: input.attemptId,
|
||||
backendProfile: input.backendProfile,
|
||||
placement: input.placement,
|
||||
sourceCommit: input.sourceCommit,
|
||||
};
|
||||
if (input.runnerId) body.id = input.runnerId;
|
||||
if (input.jobName) body.jobName = input.jobName;
|
||||
if (input.podName) body.podName = input.podName;
|
||||
if (input.logPath) body.logPath = input.logPath;
|
||||
return await this.client.post("/api/v1/runners/register", body) as RunnerRecord;
|
||||
}
|
||||
|
||||
async claim(runId: string, runnerId: string, leaseMs: number): Promise<RunRecord> {
|
||||
return await this.client.post(`/api/v1/runs/${encodeURIComponent(runId)}/claim`, { runnerId, leaseMs }) as RunRecord;
|
||||
}
|
||||
|
||||
async heartbeat(runId: string, runnerId: string, leaseMs: number): Promise<RunRecord> {
|
||||
return await this.client.patch(`/api/v1/runs/${encodeURIComponent(runId)}/lease`, { runnerId, leaseMs }) as RunRecord;
|
||||
}
|
||||
|
||||
async pollCommands(runId: string, options: { afterSeq?: number; limit?: number; commandId?: string }): Promise<PollCommandsResult> {
|
||||
const afterSeq = options.afterSeq ?? 0;
|
||||
const limit = options.limit ?? 20;
|
||||
const response = await this.client.get(`/api/v1/runs/${encodeURIComponent(runId)}/commands?afterSeq=${afterSeq}&limit=${limit}`) as { items?: CommandRecord[] };
|
||||
const items = Array.isArray(response.items) ? response.items : [];
|
||||
const selected = options.commandId ? items.find((item) => item.id === options.commandId && item.state === "pending" && item.type === "turn") ?? null : items.find((item) => item.state === "pending" && item.type === "turn") ?? null;
|
||||
return { items, selected };
|
||||
}
|
||||
|
||||
async ackCommand(commandId: string): Promise<CommandRecord> {
|
||||
return await this.client.post(`/api/v1/commands/${encodeURIComponent(commandId)}/ack`, {}) as CommandRecord;
|
||||
}
|
||||
|
||||
async appendEvent(runId: string, event: BackendEvent): Promise<JsonRecord> {
|
||||
return await this.client.post(`/api/v1/runs/${encodeURIComponent(runId)}/events`, event as unknown as JsonRecord) as JsonRecord;
|
||||
}
|
||||
|
||||
async reportStatus(runId: string, report: { terminalStatus: TerminalStatus; failureKind: FailureKind | null; failureMessage: string | null }): Promise<RunRecord> {
|
||||
return await this.client.patch(`/api/v1/runs/${encodeURIComponent(runId)}/status`, report as unknown as JsonRecord) as RunRecord;
|
||||
}
|
||||
|
||||
async reportFailure(runId: string, report: RunnerFailureReport): Promise<{ reported: boolean; run: RunRecord | null; reportError: string | null }> {
|
||||
try {
|
||||
await this.appendEvent(runId, { type: "error", payload: { failureKind: report.failureKind, message: report.failureMessage, source: "agentrun-runner" } });
|
||||
const run = await this.reportStatus(runId, report);
|
||||
return { reported: true, run, reportError: null };
|
||||
} catch (error) {
|
||||
return { reported: false, run: null, reportError: errorMessage(error) };
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export function failureKindFromError(error: unknown): FailureKind {
|
||||
if (error instanceof AgentRunError) return error.failureKind;
|
||||
const message = errorMessage(error).toLowerCase();
|
||||
if (message.includes("auth") || message.includes("unauthorized") || message.includes("forbidden")) return "provider-auth-failed";
|
||||
if (message.includes("timeout")) return "backend-timeout";
|
||||
if (message.includes("lease") || message.includes("claim")) return "runner-lease-conflict";
|
||||
return "infra-failed";
|
||||
}
|
||||
|
||||
export function terminalStatusForFailure(failureKind: FailureKind): TerminalStatus {
|
||||
if (failureKind === "cancelled") return "cancelled";
|
||||
if (failureKind === "secret-unavailable" || failureKind === "tenant-policy-denied" || failureKind === "schema-invalid") return "blocked";
|
||||
return "failed";
|
||||
}
|
||||
|
||||
export function errorMessage(error: unknown): string {
|
||||
if (error instanceof Error) return error.message;
|
||||
return String(error);
|
||||
}
|
||||
+41
-19
@@ -1,35 +1,57 @@
|
||||
import { ManagerClient } from "../mgr/client.js";
|
||||
import { RunnerManagerApi, failureKindFromError, terminalStatusForFailure, errorMessage } from "./manager-api.js";
|
||||
import { runBackendTurn, type BackendAdapterOptions } from "../backend/adapter.js";
|
||||
import type { CommandRecord, JsonRecord, RunRecord, RunnerRecord } from "../common/types.js";
|
||||
import type { BackendProfile, JsonRecord, RunRecord, RunnerRecord } from "../common/types.js";
|
||||
|
||||
export interface RunnerOnceOptions extends BackendAdapterOptions {
|
||||
managerUrl: string;
|
||||
runId: string;
|
||||
commandId?: string;
|
||||
runnerId?: string;
|
||||
attemptId?: string;
|
||||
leaseMs?: number;
|
||||
backendProfile?: BackendProfile;
|
||||
placement?: "host-process" | "kubernetes-job";
|
||||
sourceCommit?: string;
|
||||
jobName?: string;
|
||||
podName?: string;
|
||||
logPath?: string;
|
||||
}
|
||||
|
||||
export async function runOnce(options: RunnerOnceOptions): Promise<JsonRecord> {
|
||||
const client = new ManagerClient(options.managerUrl);
|
||||
const runner = await client.post("/api/v1/runners/register", {
|
||||
id: options.runnerId ?? undefined,
|
||||
const api = new RunnerManagerApi(options.managerUrl);
|
||||
const leaseMs = options.leaseMs ?? 60_000;
|
||||
const attemptId = options.attemptId ?? `attempt_${Date.now().toString(36)}`;
|
||||
const runner = await api.register({
|
||||
runId: options.runId,
|
||||
attemptId: options.attemptId ?? `attempt_${Date.now().toString(36)}`,
|
||||
backendProfile: "codex",
|
||||
placement: "host-process",
|
||||
sourceCommit: process.env.AGENTRUN_SOURCE_COMMIT ?? "unknown",
|
||||
} as JsonRecord) as unknown as RunnerRecord;
|
||||
const claimed = await client.post(`/api/v1/runs/${options.runId}/claim`, { runnerId: runner.id, leaseMs: options.leaseMs ?? 60_000 }) as unknown as RunRecord;
|
||||
const commandsResponse = await client.get(`/api/v1/runs/${options.runId}/commands?afterSeq=0&limit=20`) as { items?: CommandRecord[] };
|
||||
const command = commandsResponse.items?.find((item) => item.state === "pending" && item.type === "turn");
|
||||
if (!command) {
|
||||
await client.patch(`/api/v1/runs/${options.runId}/status`, { terminalStatus: "blocked", failureKind: "schema-invalid", failureMessage: "no pending turn command" });
|
||||
return { runner, claimed, terminalStatus: "blocked", failureKind: "schema-invalid" };
|
||||
attemptId,
|
||||
backendProfile: options.backendProfile ?? "codex",
|
||||
placement: options.placement ?? "host-process",
|
||||
sourceCommit: options.sourceCommit ?? process.env.AGENTRUN_SOURCE_COMMIT ?? "unknown",
|
||||
...(options.runnerId ? { runnerId: options.runnerId } : {}),
|
||||
...(options.jobName ? { jobName: options.jobName } : {}),
|
||||
...(options.podName ? { podName: options.podName } : {}),
|
||||
...(options.logPath ? { logPath: options.logPath } : {}),
|
||||
}) as RunnerRecord;
|
||||
let claimed: RunRecord;
|
||||
try {
|
||||
claimed = await api.claim(options.runId, runner.id, leaseMs);
|
||||
await api.heartbeat(options.runId, runner.id, leaseMs);
|
||||
} catch (error) {
|
||||
const failureKind = failureKindFromError(error);
|
||||
if (failureKind !== "runner-lease-conflict") {
|
||||
await api.reportFailure(options.runId, { terminalStatus: terminalStatusForFailure(failureKind), failureKind, failureMessage: errorMessage(error) });
|
||||
}
|
||||
await client.post(`/api/v1/commands/${command.id}/ack`, {});
|
||||
throw error;
|
||||
}
|
||||
const commandsResponse = await api.pollCommands(options.runId, { afterSeq: 0, limit: 20, ...(options.commandId ? { commandId: options.commandId } : {}) });
|
||||
const command = commandsResponse.selected;
|
||||
if (!command) {
|
||||
await api.reportStatus(options.runId, { terminalStatus: "blocked", failureKind: "schema-invalid", failureMessage: "no pending turn command" });
|
||||
return { runner, claimed, terminalStatus: "blocked", failureKind: "schema-invalid", polledCommands: commandsResponse.items.length };
|
||||
}
|
||||
await api.ackCommand(command.id);
|
||||
const result = await runBackendTurn(claimed, command, options);
|
||||
for (const event of result.events) await client.post(`/api/v1/runs/${options.runId}/events`, event as unknown as JsonRecord);
|
||||
const finalRun = await client.patch(`/api/v1/runs/${options.runId}/status`, { terminalStatus: result.terminalStatus, failureKind: result.failureKind, failureMessage: result.failureMessage }) as unknown as RunRecord;
|
||||
for (const event of result.events) await api.appendEvent(options.runId, event);
|
||||
const finalRun = await api.reportStatus(options.runId, { terminalStatus: result.terminalStatus, failureKind: result.failureKind, failureMessage: result.failureMessage }) as RunRecord;
|
||||
return { runner, commandId: command.id, terminalStatus: result.terminalStatus, failureKind: result.failureKind, run: finalRun } as JsonRecord;
|
||||
}
|
||||
|
||||
+9
-1
@@ -6,6 +6,7 @@ import assert from "node:assert/strict";
|
||||
import { startManagerServer } from "../mgr/server.js";
|
||||
import { ManagerClient } from "../mgr/client.js";
|
||||
import { runOnce } from "../runner/run-once.js";
|
||||
import { renderRunnerJobDryRun } from "../runner/k8s-job.js";
|
||||
import { redactText } from "../common/redaction.js";
|
||||
|
||||
const root = path.resolve(path.dirname(fileURLToPath(import.meta.url)), "../..");
|
||||
@@ -45,18 +46,25 @@ try {
|
||||
const command = await client.post(`/api/v1/runs/${run.id}/commands`, { type: "turn", payload: { prompt: "hello" }, idempotencyKey: "selftest-turn" }) as { id: string };
|
||||
const duplicate = await client.post(`/api/v1/runs/${run.id}/commands`, { type: "turn", payload: { prompt: "hello" }, idempotencyKey: "selftest-turn" }) as { id: string };
|
||||
assert.equal(duplicate.id, command.id);
|
||||
const rendered = renderRunnerJobDryRun({ run: await client.get(`/api/v1/runs/${run.id}`) as never, commandId: command.id, managerUrl: server.baseUrl, image: "127.0.0.1:5000/agentrun/agentrun-mgr@sha256:1111111111111111111111111111111111111111111111111111111111111111", attemptId: "attempt_selftest", sourceCommit: "self-test" });
|
||||
assert.equal(rendered.dryRun, true);
|
||||
assert.equal(rendered.mutation, false);
|
||||
assert.equal((rendered.jobIdentity as { serviceAccountName?: string }).serviceAccountName, "agentrun-v01-runner");
|
||||
assert.equal(JSON.stringify(rendered).includes("test-token-material"), false);
|
||||
const fakePath = path.join(root, "src/selftest/fake-codex-app-server.ts");
|
||||
const fakeCommand = process.env.AGENTRUN_SELFTEST_CODEX_COMMAND ?? process.execPath;
|
||||
const fakeArgs = process.env.AGENTRUN_SELFTEST_CODEX_ARGS ? JSON.parse(process.env.AGENTRUN_SELFTEST_CODEX_ARGS) as string[] : [fakePath];
|
||||
const result = await runOnce({ managerUrl: server.baseUrl, runId: run.id, codexCommand: fakeCommand, codexArgs: fakeArgs, codexHome, env: { CODEX_HOME: codexHome } });
|
||||
assert.equal(result.terminalStatus, "completed");
|
||||
assert.equal(typeof (result.runner as { id?: unknown }).id, "string");
|
||||
const events = await client.get(`/api/v1/runs/${run.id}/events?afterSeq=0&limit=100`) as { items?: Array<{ type: string; payload: unknown }> };
|
||||
assert.ok(events.items?.some((event) => event.type === "assistant_message"));
|
||||
assert.ok(events.items?.some((event) => event.type === "backend_status" && JSON.stringify(event.payload).includes("run-claimed")));
|
||||
assert.equal(JSON.stringify(events).includes("test-token-material"), false);
|
||||
assert.equal(JSON.stringify(events).includes("Bearer test-token"), false);
|
||||
const finalRun = await client.get(`/api/v1/runs/${run.id}`) as { terminalStatus?: string };
|
||||
assert.equal(finalRun.terminalStatus, "completed");
|
||||
console.log(JSON.stringify({ ok: true, tests: ["manager-memory-lifecycle", "codex-stdio-fake-turn", "redaction"], runId: run.id }));
|
||||
console.log(JSON.stringify({ ok: true, tests: ["manager-memory-lifecycle", "runner-lease-heartbeat", "runner-k8s-job-dry-run", "codex-stdio-fake-turn", "redaction"], runId: run.id, jobName: (rendered.jobIdentity as { name?: string }).name }));
|
||||
} finally {
|
||||
await new Promise<void>((resolve) => server.server.close(() => resolve()));
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user