From 5deb9fa7fdcfe5377ad1defc96285f8016d91da1 Mon Sep 17 00:00:00 2001 From: Codex Date: Fri, 29 May 2026 10:52:41 +0800 Subject: [PATCH] feat: add v0.1 runtime skeleton --- .gitignore | 1 + deploy/deploy.json | 57 ++++++ package.json | 16 ++ scripts/agentrun-cli.ts | 4 + scripts/src/cli.ts | 134 ++++++++++++++ src/backend/adapter.ts | 29 +++ src/backend/codex-stdio.ts | 246 ++++++++++++++++++++++++++ src/common/errors.ts | 28 +++ src/common/redaction.ts | 22 +++ src/common/types.ts | 137 ++++++++++++++ src/common/validation.ts | 88 +++++++++ src/mgr/client.ts | 32 ++++ src/mgr/main.ts | 6 + src/mgr/server.ts | 116 ++++++++++++ src/mgr/store.ts | 138 +++++++++++++++ src/runner/main.ts | 19 ++ src/runner/run-once.ts | 35 ++++ src/selftest/fake-codex-app-server.ts | 48 +++++ src/selftest/run.ts | 65 +++++++ tsconfig.json | 17 ++ 20 files changed, 1238 insertions(+) create mode 100644 deploy/deploy.json create mode 100644 package.json create mode 100644 scripts/agentrun-cli.ts create mode 100644 scripts/src/cli.ts create mode 100644 src/backend/adapter.ts create mode 100644 src/backend/codex-stdio.ts create mode 100644 src/common/errors.ts create mode 100644 src/common/redaction.ts create mode 100644 src/common/types.ts create mode 100644 src/common/validation.ts create mode 100644 src/mgr/client.ts create mode 100644 src/mgr/main.ts create mode 100644 src/mgr/server.ts create mode 100644 src/mgr/store.ts create mode 100644 src/runner/main.ts create mode 100644 src/runner/run-once.ts create mode 100644 src/selftest/fake-codex-app-server.ts create mode 100644 src/selftest/run.ts create mode 100644 tsconfig.json diff --git a/.gitignore b/.gitignore index 5bd2d9b..9148ecd 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,7 @@ .state/ logs/ node_modules/ +package-lock.json dist/ build/ coverage/ diff --git a/deploy/deploy.json b/deploy/deploy.json new file mode 100644 index 0000000..b5fbd2e --- /dev/null +++ b/deploy/deploy.json @@ -0,0 +1,57 @@ +{ + "lane": "v0.1", + "runtimeNamespace": "agentrun-v01", + "gitopsBranch": "v0.1-gitops", + "runtimePath": "deploy/gitops/g14/runtime-v01", + "services": [ + { + "id": "agentrun-mgr", + "component": "manager", + "enabled": true, + "replicas": 1, + "ports": [{ "name": "http", "containerPort": 8080 }], + "health": { + "livePath": "/health/live", + "readinessPath": "/health/readiness" + }, + "env": [ + { "name": "AGENTRUN_LANE", "value": "v0.1" }, + { "name": "AGENTRUN_RUNTIME_NAMESPACE", "value": "agentrun-v01" }, + { "name": "DATABASE_URL", "secretRef": { "name": "agentrun-v01-mgr-db", "key": "DATABASE_URL" } } + ], + "resources": { + "requests": { "cpu": "100m", "memory": "256Mi" }, + "limits": { "cpu": "800m", "memory": "1Gi" } + } + }, + { + "id": "agentrun-runner", + "component": "runner-job-template", + "enabled": true, + "serviceAccount": "agentrun-v01-runner", + "secretMounts": [ + { + "name": "codex-home", + "secretRef": { "name": "agentrun-v01-provider-codex", "keys": ["auth.json", "config.toml"] }, + "mountPath": "/home/agentrun/.codex", + "readOnly": true + } + ], + "env": [ + { "name": "HOME", "value": "/home/agentrun" }, + { "name": "AGENTRUN_LANE", "value": "v0.1" } + ], + "resources": { + "requests": { "cpu": "250m", "memory": "512Mi" }, + "limits": { "cpu": "2", "memory": "4Gi" } + } + } + ], + "postgres": { + "statefulSet": "agentrun-v01-postgres", + "service": "agentrun-v01-postgres", + "database": "agentrun_v01", + "secretRef": { "name": "agentrun-v01-postgres" }, + "dataPvc": "agentrun-v01-postgres-data" + } +} diff --git a/package.json b/package.json new file mode 100644 index 0000000..ea5c1f2 --- /dev/null +++ b/package.json @@ -0,0 +1,16 @@ +{ + "name": "agentrun", + "version": "0.1.0", + "private": true, + "type": "module", + "scripts": { + "check": "tsc --noEmit", + "self-test": "bun run src/selftest/run.ts", + "test": "bun run src/selftest/run.ts", + "cli": "bun scripts/agentrun-cli.ts" + }, + "devDependencies": { + "@types/node": "^22.10.0", + "typescript": "^5.8.3" + } +} diff --git a/scripts/agentrun-cli.ts b/scripts/agentrun-cli.ts new file mode 100644 index 0000000..0010f24 --- /dev/null +++ b/scripts/agentrun-cli.ts @@ -0,0 +1,4 @@ +#!/usr/bin/env bun +import { runCli } from "./src/cli.js"; + +await runCli(process.argv.slice(2)); diff --git a/scripts/src/cli.ts b/scripts/src/cli.ts new file mode 100644 index 0000000..169838c --- /dev/null +++ b/scripts/src/cli.ts @@ -0,0 +1,134 @@ +import { readFile } from "node:fs/promises"; +import { startManagerServer } from "../../src/mgr/server.js"; +import { ManagerClient } from "../../src/mgr/client.js"; +import { runOnce } from "../../src/runner/run-once.js"; +import type { JsonRecord, JsonValue } from "../../src/common/types.js"; +import { AgentRunError, errorToJson } from "../../src/common/errors.js"; +import type { RunnerOnceOptions } from "../../src/runner/run-once.js"; + +interface ParsedArgs { + positional: string[]; + flags: Map; +} + +export async function runCli(argv: string[]): Promise { + try { + const result = await dispatch(parseArgs(argv)); + print({ ok: true, data: result }); + } catch (error) { + const status = error instanceof AgentRunError ? error.httpStatus : 1; + print({ ok: false, ...(error instanceof AgentRunError ? { failureKind: error.failureKind, message: error.message } : { failureKind: "infra-failed", message: error instanceof Error ? error.message : String(error) }), error: errorToJson(error) }); + process.exitCode = status === 0 ? 1 : status > 255 ? 1 : status; + } +} + +async function dispatch(args: ParsedArgs): Promise { + const [group, command, id] = args.positional; + if (!group || group === "help") return help(); + if (group === "server" && command === "start") return startServer(args); + if (group === "server" && command === "status") return client(args).get("/health/readiness"); + if (group === "backends" && command === "list") return client(args).get("/api/v1/backends"); + if (group === "runs" && command === "create") return client(args).post("/api/v1/runs", await jsonFile(args)); + if (group === "runs" && command === "show" && id) return client(args).get(`/api/v1/runs/${encodeURIComponent(id)}`); + if (group === "runs" && command === "events" && id) return client(args).get(`/api/v1/runs/${encodeURIComponent(id)}/events?afterSeq=${flag(args, "after-seq", "0")}&limit=${flag(args, "limit", "100")}`); + if (group === "commands" && command === "create" && id) { + const body = await jsonFile(args); + if (!body.type) body.type = flag(args, "type", "turn"); + const idempotencyKey = optionalFlag(args, "idempotency-key"); + if (idempotencyKey) body.idempotencyKey = idempotencyKey; + return client(args).post(`/api/v1/runs/${encodeURIComponent(id)}/commands`, body); + } + if (group === "commands" && command === "show" && id) { + const runId = flag(args, "run-id", ""); + if (!runId) throw new AgentRunError("schema-invalid", "commands show requires --run-id", { httpStatus: 2 }); + return client(args).get(`/api/v1/runs/${encodeURIComponent(runId)}/commands/${encodeURIComponent(id)}`); + } + if (group === "runner" && command === "start") { + const runId = flag(args, "run-id", ""); + if (!runId) throw new AgentRunError("schema-invalid", "runner start requires --run-id", { httpStatus: 2 }); + const options: RunnerOnceOptions = { + managerUrl: managerUrl(args), + runId, + }; + const runnerId = optionalFlag(args, "runner-id"); + const codexCommand = optionalFlag(args, "codex-command"); + const codexHome = optionalFlag(args, "codex-home") ?? process.env.CODEX_HOME; + if (runnerId) options.runnerId = runnerId; + if (codexCommand) options.codexCommand = codexCommand; + if (codexHome) options.codexHome = codexHome; + return runOnce(options) as unknown as JsonValue; + } + throw new AgentRunError("schema-invalid", `unsupported command: ${args.positional.join(" ")}`, { httpStatus: 2 }); +} + +async function startServer(args: ParsedArgs): Promise { + const port = Number(flag(args, "port", "8080")); + const host = flag(args, "host", "0.0.0.0"); + const started = await startManagerServer({ port, host }); + return { serviceId: "agentrun-mgr", baseUrl: started.baseUrl, pid: process.pid, note: "foreground process; use Kubernetes/Tekton for v0.1 runtime" }; +} + +function client(args: ParsedArgs): ManagerClient { + return new ManagerClient(managerUrl(args)); +} + +function managerUrl(args: ParsedArgs): string { + return optionalFlag(args, "manager-url") ?? process.env.AGENTRUN_MGR_URL ?? "http://127.0.0.1:8080"; +} + +async function jsonFile(args: ParsedArgs): Promise { + const file = optionalFlag(args, "json-file"); + if (!file) throw new AgentRunError("schema-invalid", "--json-file is required", { httpStatus: 2 }); + const value = JSON.parse(await readFile(file, "utf8")) as unknown; + if (typeof value === "object" && value !== null && !Array.isArray(value)) return value as JsonRecord; + throw new AgentRunError("schema-invalid", "json file must contain an object", { httpStatus: 2 }); +} + +function parseArgs(argv: string[]): ParsedArgs { + const positional: string[] = []; + const flags = new Map(); + for (let index = 0; index < argv.length; index += 1) { + const item = argv[index] ?? ""; + if (!item.startsWith("--")) { + positional.push(item); + continue; + } + const key = item.slice(2); + const next = argv[index + 1]; + if (next === undefined || next.startsWith("--")) flags.set(key, true); + else { + flags.set(key, next); + index += 1; + } + } + return { positional, flags }; +} + +function flag(args: ParsedArgs, name: string, fallback: string): string { + const value = args.flags.get(name); + return typeof value === "string" ? value : fallback; +} + +function optionalFlag(args: ParsedArgs, name: string): string | null { + const value = args.flags.get(name); + return typeof value === "string" && value.length > 0 ? value : null; +} + +function help(): JsonRecord { + return { + commands: [ + "runs create --json-file ", + "runs show ", + "runs events --after-seq --limit ", + "commands create --type turn --json-file ", + "commands show --run-id ", + "runner start --run-id ", + "backends list", + "server start|status", + ], + }; +} + +function print(value: JsonRecord): void { + process.stdout.write(`${JSON.stringify(value)}\n`); +} diff --git a/src/backend/adapter.ts b/src/backend/adapter.ts new file mode 100644 index 0000000..d12ff54 --- /dev/null +++ b/src/backend/adapter.ts @@ -0,0 +1,29 @@ +import type { BackendTurnResult, CommandRecord, RunRecord } from "../common/types.js"; +import { runCodexStdioTurn, type CodexStdioTurnOptions } from "./codex-stdio.js"; + +export interface BackendAdapterOptions { + codexCommand?: string; + codexArgs?: string[]; + codexHome?: string; + env?: NodeJS.ProcessEnv; +} + +export async function runBackendTurn(run: RunRecord, command: CommandRecord, options: BackendAdapterOptions = {}): Promise { + if (run.backendProfile !== "codex") { + return { terminalStatus: "failed", failureKind: "backend-failed", failureMessage: `unsupported backendProfile ${run.backendProfile}`, events: [{ type: "error", payload: { failureKind: "backend-failed", backendProfile: run.backendProfile } }] }; + } + const prompt = typeof command.payload.prompt === "string" ? command.payload.prompt : JSON.stringify(command.payload); + const turnOptions: CodexStdioTurnOptions = { + prompt, + cwd: typeof run.workspaceRef.path === "string" ? run.workspaceRef.path : process.cwd(), + approvalPolicy: run.executionPolicy.approval, + sandbox: run.executionPolicy.sandbox, + timeoutMs: run.executionPolicy.timeoutMs, + }; + if (typeof command.payload.model === "string") turnOptions.model = command.payload.model; + if (options.codexCommand) turnOptions.command = options.codexCommand; + if (options.codexArgs) turnOptions.args = options.codexArgs; + if (options.env) turnOptions.env = options.env; + if (options.codexHome) turnOptions.codexHome = options.codexHome; + return runCodexStdioTurn(turnOptions); +} diff --git a/src/backend/codex-stdio.ts b/src/backend/codex-stdio.ts new file mode 100644 index 0000000..c82365c --- /dev/null +++ b/src/backend/codex-stdio.ts @@ -0,0 +1,246 @@ +import { spawn, type ChildProcessWithoutNullStreams } from "node:child_process"; +import { accessSync, constants as fsConstants } from "node:fs"; +import * as readline from "node:readline"; +import type { BackendEvent, BackendTurnResult, FailureKind, JsonRecord, JsonValue, TerminalStatus } from "../common/types.js"; +import { redactJson, redactText } from "../common/redaction.js"; + +export interface CodexStdioTurnOptions { + prompt: string; + cwd: string; + model?: string; + approvalPolicy: string; + sandbox: string; + timeoutMs: number; + command?: string; + args?: string[]; + env?: NodeJS.ProcessEnv; + codexHome?: string; +} + +interface PendingRequest { + resolve: (value: unknown) => void; + reject: (error: Error) => void; +} + +export class CodexStdioClient { + private readonly child: ChildProcessWithoutNullStreams; + private readonly pending = new Map(); + private readonly stderrChunks: Buffer[] = []; + private nextId = 1; + private closed = false; + readonly closedPromise: Promise<{ code: number | null; signal: string | null; stderrTail: string }>; + private closeResolve!: (value: { code: number | null; signal: string | null; stderrTail: string }) => void; + + constructor(options: { command?: string; args?: string[]; cwd: string; env?: NodeJS.ProcessEnv; onNotification: (message: JsonRecord) => void }) { + this.closedPromise = new Promise((resolve) => { this.closeResolve = resolve; }); + this.child = spawn(options.command ?? "codex", options.args ?? ["app-server", "--listen", "stdio://"], { + cwd: options.cwd, + env: options.env ?? process.env, + stdio: "pipe", + }); + this.child.stderr.on("data", (chunk: Buffer) => { + this.stderrChunks.push(chunk); + while (Buffer.concat(this.stderrChunks).length > 64_000) this.stderrChunks.shift(); + }); + const rl = readline.createInterface({ input: this.child.stdout, crlfDelay: Infinity }); + void this.readLines(rl, options.onNotification); + this.child.on("close", (code, signal) => this.handleClose(code, signal)); + this.child.on("error", (error) => this.handleClose(127, error.message)); + } + + request(method: string, params: JsonRecord): Promise { + if (this.closed) return Promise.reject(new Error("codex app-server is closed")); + const id = this.nextId++; + const message = { id, method, params }; + return new Promise((resolve, reject) => { + this.pending.set(id, { resolve, reject }); + this.child.stdin.write(`${JSON.stringify(message)}\n`); + }); + } + + notify(method: string, params: JsonRecord = {}): void { + if (!this.closed) this.child.stdin.write(`${JSON.stringify({ method, params })}\n`); + } + + stop(): void { + if (this.closed) return; + this.child.kill("SIGTERM"); + setTimeout(() => { + if (!this.closed) this.child.kill("SIGKILL"); + }, 1500).unref?.(); + } + + private async readLines(rl: readline.Interface, onNotification: (message: JsonRecord) => void): Promise { + try { + for await (const line of rl) { + const trimmed = String(line).trim(); + if (trimmed.length === 0) continue; + const message = JSON.parse(trimmed) as JsonRecord; + const id = typeof message.id === "number" ? message.id : null; + const method = typeof message.method === "string" ? message.method : null; + if (id !== null && method === null) { + const pending = this.pending.get(id); + if (!pending) continue; + this.pending.delete(id); + if (message.error !== undefined) pending.reject(new Error(JSON.stringify(redactJson(message.error)))); + else pending.resolve(message.result); + continue; + } + if (id !== null && method !== null) { + this.handleServerRequest(id, method); + continue; + } + if (method !== null) onNotification(message); + } + } catch (error) { + this.rejectAll(error instanceof Error ? error : new Error(String(error))); + } + } + + private handleServerRequest(id: number, method: string): void { + if (method === "item/commandExecution/requestApproval" || method === "item/fileChange/requestApproval") { + this.child.stdin.write(`${JSON.stringify({ id, result: { decision: "decline" } })}\n`); + return; + } + this.child.stdin.write(`${JSON.stringify({ id, error: { code: -32601, message: `Unsupported client-side request: ${method}` } })}\n`); + } + + private rejectAll(error: Error): void { + for (const pending of this.pending.values()) pending.reject(error); + this.pending.clear(); + } + + private handleClose(code: number | null, signal: string | null): void { + if (this.closed) return; + this.closed = true; + const stderrTail = redactText(Buffer.concat(this.stderrChunks).toString("utf8").slice(-8000)); + this.rejectAll(new Error(`codex app-server closed code=${code} signal=${signal}`)); + this.closeResolve({ code, signal, stderrTail }); + } +} + +export async function runCodexStdioTurn(options: CodexStdioTurnOptions): Promise { + const secretFailure = codexHomeReadiness(options.codexHome ?? options.env?.CODEX_HOME ?? `${options.env?.HOME ?? process.env.HOME ?? ""}/.codex`); + if (secretFailure) return secretFailure; + const events: BackendEvent[] = [{ type: "backend_status", payload: { phase: "codex-app-server-starting", protocol: "codex-app-server-jsonrpc-stdio" } }]; + let assistantText = ""; + let threadId: string | undefined; + let turnId: string | undefined; + let terminal: { status: TerminalStatus; failureKind: FailureKind | null; message: string | null } | null = null; + let terminalResolve!: () => void; + const terminalPromise = new Promise((resolve) => { terminalResolve = resolve; }); + const clientOptions: ConstructorParameters[0] = { + cwd: options.cwd, + env: childEnv(options), + onNotification: (message) => { + const normalized = normalizeCodexNotification(message); + if (normalized.threadId) threadId = normalized.threadId; + if (normalized.turnId) turnId = normalized.turnId; + if (normalized.assistantDelta) assistantText += normalized.assistantDelta; + events.push(...normalized.events); + if (normalized.terminal) { + terminal = normalized.terminal; + terminalResolve(); + } + }, + }; + if (options.command) clientOptions.command = options.command; + if (options.args) clientOptions.args = options.args; + const client = new CodexStdioClient(clientOptions); + const timeout = setTimeout(() => { + if (!terminal) { + terminal = { status: "failed", failureKind: "backend-timeout", message: `codex stdio turn timed out after ${options.timeoutMs}ms` }; + events.push({ type: "error", payload: terminal }); + client.stop(); + terminalResolve(); + } + }, options.timeoutMs); + try { + await client.request("initialize", { clientInfo: { name: "agentrun", title: "AgentRun", version: "0.1.0" }, capabilities: { experimentalApi: true } }); + const threadResponse = asResponseRecord(await client.request("thread/start", { model: options.model ?? "default", cwd: options.cwd, approvalPolicy: options.approvalPolicy, sandbox: options.sandbox, serviceName: "agentrun" })); + threadId = stringAt(asRecordAt(threadResponse, "thread"), "id") ?? threadId; + const turnResponse = asResponseRecord(await client.request("turn/start", { threadId: threadId ?? "", input: [{ type: "text", text: options.prompt, text_elements: [] }], cwd: options.cwd, approvalPolicy: options.approvalPolicy, model: options.model ?? "default" })); + turnId = stringAt(asRecordAt(turnResponse, "turn"), "id") ?? turnId; + await Promise.race([terminalPromise, client.closedPromise]); + if (!terminal) terminal = { status: "failed", failureKind: "backend-protocol-error", message: "codex app-server closed before turn/completed" }; + } catch (error) { + terminal = { status: "failed", failureKind: "backend-protocol-error", message: error instanceof Error ? error.message : String(error) }; + events.push({ type: "error", payload: { failureKind: terminal.failureKind, message: terminal.message } }); + } finally { + clearTimeout(timeout); + client.stop(); + } + if (assistantText.trim().length > 0) events.push({ type: "assistant_message", payload: { text: assistantText } }); + events.push({ type: "terminal_status", payload: { terminalStatus: terminal.status, failureKind: terminal.failureKind, message: terminal.message } }); + return { terminalStatus: terminal.status, failureKind: terminal.failureKind, failureMessage: terminal.message, events: events.map((event) => ({ ...event, payload: redactJson(event.payload) })), ...(threadId ? { threadId } : {}), ...(turnId ? { turnId } : {}) }; +} + +function codexHomeReadiness(codexHome: string): BackendTurnResult | null { + try { + accessSync(`${codexHome}/auth.json`, fsConstants.R_OK); + accessSync(`${codexHome}/config.toml`, fsConstants.R_OK); + return null; + } catch { + return { + terminalStatus: "blocked", + failureKind: "secret-unavailable", + failureMessage: "Codex auth.json or config.toml projection is not readable", + events: [ + { type: "error", payload: { failureKind: "secret-unavailable", credentialSource: { codexHome, valuesPrinted: false } } }, + { type: "terminal_status", payload: { terminalStatus: "blocked", failureKind: "secret-unavailable" } }, + ], + }; + } +} + +function normalizeCodexNotification(message: JsonRecord): { events: BackendEvent[]; assistantDelta?: string; threadId?: string; turnId?: string; terminal?: { status: TerminalStatus; failureKind: FailureKind | null; message: string | null } } { + const method = typeof message.method === "string" ? message.method : "unknown"; + const params = asRecordAt(message, "params"); + if (method === "thread/started") { + const threadId = stringAt(asRecordAt(params, "thread"), "id"); + return { events: [{ type: "backend_status", payload: { phase: method, threadId } }], ...(threadId ? { threadId } : {}) }; + } + if (method === "turn/started") { + const turnId = stringAt(asRecordAt(params, "turn"), "id"); + return { events: [{ type: "backend_status", payload: { phase: method, turnId } }], ...(turnId ? { turnId } : {}) }; + } + if (method === "item/agentMessage/delta") return { events: [], assistantDelta: typeof params.delta === "string" ? params.delta : "" }; + if (method === "item/commandExecution/outputDelta") return { events: [{ type: "command_output", payload: { stream: "stdout", text: typeof params.delta === "string" ? params.delta : "" } }] }; + if (method === "item/started" || method === "item/completed") return { events: [{ type: "tool_call", payload: { method, item: redactJson(asRecordAt(params, "item")) } }] }; + if (method === "error") return { events: [{ type: "error", payload: { failureKind: "backend-failed", error: redactJson(params.error ?? params) } }] }; + if (method === "turn/completed") { + const turn = asRecordAt(params, "turn"); + const status = terminalStatusFromValue(turn.status); + const error = asRecordAt(turn, "error"); + const messageText = typeof error.message === "string" ? error.message : null; + return { events: [{ type: "backend_status", payload: { phase: method, terminalStatus: status } }], terminal: { status, failureKind: status === "completed" ? null : "backend-failed", message: messageText } }; + } + return { events: [{ type: "backend_status", payload: { phase: method } }] }; +} + +function terminalStatusFromValue(value: unknown): TerminalStatus { + if (value === "completed") return "completed"; + if (value === "cancelled" || value === "canceled") return "cancelled"; + if (value === "blocked") return "blocked"; + return "failed"; +} + +function childEnv(options: CodexStdioTurnOptions): NodeJS.ProcessEnv { + const env: NodeJS.ProcessEnv = { ...process.env, ...options.env }; + const codexHome = options.codexHome ?? options.env?.CODEX_HOME; + if (codexHome) env.CODEX_HOME = codexHome; + return env; +} + +function asResponseRecord(value: unknown): JsonRecord { + return typeof value === "object" && value !== null && !Array.isArray(value) ? value as JsonRecord : {}; +} + +function asRecordAt(value: JsonRecord, key: string): JsonRecord { + const next = value[key]; + return typeof next === "object" && next !== null && !Array.isArray(next) ? next as JsonRecord : {}; +} + +function stringAt(value: JsonRecord, key: string): string | null { + return typeof value[key] === "string" ? value[key] : null; +} diff --git a/src/common/errors.ts b/src/common/errors.ts new file mode 100644 index 0000000..81705aa --- /dev/null +++ b/src/common/errors.ts @@ -0,0 +1,28 @@ +import type { FailureKind, JsonRecord } from "./types.js"; + +export class AgentRunError extends Error { + readonly failureKind: FailureKind; + readonly details: JsonRecord | null; + readonly httpStatus: number; + + constructor(failureKind: FailureKind, message: string, options: { httpStatus?: number; details?: JsonRecord } = {}) { + super(message); + this.name = "AgentRunError"; + this.failureKind = failureKind; + this.httpStatus = options.httpStatus ?? 400; + this.details = options.details ?? null; + } +} + +export function errorToJson(error: unknown): JsonRecord { + if (error instanceof AgentRunError) { + return { + name: error.name, + failureKind: error.failureKind, + message: error.message, + details: error.details, + }; + } + if (error instanceof Error) return { name: error.name, message: error.message, stack: error.stack ?? null }; + return { name: "UnknownError", message: String(error) }; +} diff --git a/src/common/redaction.ts b/src/common/redaction.ts new file mode 100644 index 0000000..43f06d1 --- /dev/null +++ b/src/common/redaction.ts @@ -0,0 +1,22 @@ +export function redactText(value: string): string { + return value + .replace(/(authorization\s*[:=]\s*)(bearer\s+)?[A-Za-z0-9._~+/=-]+/giu, "$1$2REDACTED") + .replace(/((?:api[_-]?key|token|password|secret)\s*[:=]\s*)[A-Za-z0-9._~+/=-]+/giu, "$1REDACTED") + .replace(/(postgres(?:ql)?:\/\/[^:\s/@]+:)[^@\s]+(@)/giu, "$1REDACTED$2") + .replace(/(https?:\/\/[^:\s/@]+:)[^@\s]+(@)/giu, "$1REDACTED$2"); +} + +export function redactJson(value: T): T { + if (typeof value === "string") return redactText(value) as T; + if (Array.isArray(value)) return value.map((item) => redactJson(item)) as T; + if (typeof value !== "object" || value === null) return value; + const result: Record = {}; + for (const [key, entry] of Object.entries(value)) { + if (/auth|authorization|api[_-]?key|token|password|secret|credential|dsn/iu.test(key)) { + result[key] = typeof entry === "boolean" ? entry : "REDACTED"; + continue; + } + result[key] = redactJson(entry); + } + return result as T; +} diff --git a/src/common/types.ts b/src/common/types.ts new file mode 100644 index 0000000..1ef153e --- /dev/null +++ b/src/common/types.ts @@ -0,0 +1,137 @@ +export type JsonPrimitive = string | number | boolean | null; +export type JsonValue = JsonPrimitive | JsonValue[] | { [key: string]: JsonValue }; +export type JsonRecord = { [key: string]: JsonValue }; + +export type FailureKind = + | "schema-invalid" + | "tenant-policy-denied" + | "secret-unavailable" + | "runner-lease-conflict" + | "backend-failed" + | "backend-protocol-error" + | "backend-timeout" + | "provider-auth-failed" + | "provider-rate-limited" + | "infra-failed" + | "cancelled"; + +export type RunStatus = "pending" | "claimed" | "running" | "completed" | "failed" | "blocked" | "cancelled"; +export type CommandState = "pending" | "acknowledged" | "completed" | "failed" | "cancelled"; +export type TerminalStatus = "completed" | "failed" | "blocked" | "cancelled"; +export type BackendProfile = "codex"; + +export interface WorkspaceRef extends JsonRecord { + kind: "git-worktree" | "host-path" | "kubernetes-pvc" | "opaque"; + path?: string; + repo?: string; + branch?: string; +} + +export interface SecretRef extends JsonRecord { + namespace?: string; + name: string; + keys?: string[]; + mountPath?: string; +} + +export interface ExecutionPolicy extends JsonRecord { + sandbox: string; + approval: string; + timeoutMs: number; + network: string; + secretScope: { + providerCredentials?: Array<{ + profile: BackendProfile | string; + secretRef: SecretRef; + }>; + allowCredentialEcho?: false; + }; +} + +export interface CreateRunInput extends JsonRecord { + tenantId: string; + projectId: string; + workspaceRef: WorkspaceRef; + providerId: string; + backendProfile: BackendProfile; + executionPolicy: ExecutionPolicy; + traceSink: JsonValue; +} + +export interface RunRecord extends CreateRunInput { + id: string; + status: RunStatus; + terminalStatus: TerminalStatus | null; + failureKind: FailureKind | null; + failureMessage: string | null; + createdAt: string; + updatedAt: string; + claimedBy: string | null; + leaseExpiresAt: string | null; +} + +export interface CreateCommandInput extends JsonRecord { + type: "turn" | "interrupt"; + payload: JsonRecord; + idempotencyKey?: string; +} + +export interface CommandRecord extends CreateCommandInput { + id: string; + runId: string; + seq: number; + state: CommandState; + payloadHash: string; + createdAt: string; + updatedAt: string; + acknowledgedAt: string | null; +} + +export type EventType = "backend_status" | "assistant_message" | "tool_call" | "command_output" | "diff" | "error" | "terminal_status"; + +export interface RunEvent extends JsonRecord { + id: string; + runId: string; + seq: number; + type: EventType; + payload: JsonRecord; + createdAt: string; +} + +export interface RunnerRecord extends JsonRecord { + id: string; + runId?: string; + attemptId?: string; + backendProfile?: BackendProfile; + placement?: string; + sourceCommit?: string; + registeredAt: string; + heartbeatAt: string; +} + +export interface BackendEvent { + type: EventType; + payload: JsonRecord; +} + +export interface BackendTurnResult { + terminalStatus: TerminalStatus; + failureKind: FailureKind | null; + failureMessage: string | null; + events: BackendEvent[]; + threadId?: string; + turnId?: string; +} + +export interface ApiErrorBody extends JsonRecord { + ok: false; + failureKind: FailureKind; + message: string; + traceId: string; +} + +export interface ApiOkBody extends JsonRecord { + ok: true; + data: T; + traceId: string; +} diff --git a/src/common/validation.ts b/src/common/validation.ts new file mode 100644 index 0000000..495ee5d --- /dev/null +++ b/src/common/validation.ts @@ -0,0 +1,88 @@ +import { createHash, randomUUID } from "node:crypto"; +import type { BackendProfile, CreateCommandInput, CreateRunInput, ExecutionPolicy, JsonRecord, JsonValue } from "./types.js"; +import { AgentRunError } from "./errors.js"; + +const allowedTenants = new Set(["unidesk", "hwlab"]); +const allowedBackends = new Set(["codex"]); + +export function nowIso(): string { + return new Date().toISOString(); +} + +export function newId(prefix: string): string { + return `${prefix}_${randomUUID().replace(/-/gu, "")}`; +} + +export function stableHash(value: JsonValue): string { + return createHash("sha256").update(JSON.stringify(sortJson(value))).digest("hex"); +} + +function sortJson(value: JsonValue): JsonValue { + if (Array.isArray(value)) return value.map(sortJson); + if (typeof value !== "object" || value === null) return value; + return Object.fromEntries(Object.entries(value).sort(([a], [b]) => a.localeCompare(b)).map(([key, entry]) => [key, sortJson(entry)])); +} + +export function asRecord(value: unknown, fieldName: string): JsonRecord { + if (typeof value === "object" && value !== null && !Array.isArray(value)) return value as JsonRecord; + throw new AgentRunError("schema-invalid", `${fieldName} must be an object`, { httpStatus: 400 }); +} + +function requiredString(record: JsonRecord, key: string): string { + const value = record[key]; + if (typeof value !== "string" || value.trim().length === 0) throw new AgentRunError("schema-invalid", `${key} is required`, { httpStatus: 400 }); + return value.trim(); +} + +function requiredRecord(record: JsonRecord, key: string): JsonRecord { + return asRecord(record[key], key); +} + +export function validateCreateRun(input: unknown): CreateRunInput { + const record = asRecord(input, "run"); + const tenantId = requiredString(record, "tenantId"); + if (!allowedTenants.has(tenantId)) throw new AgentRunError("tenant-policy-denied", `tenantId ${tenantId} is not allowed`, { httpStatus: 403 }); + const backendProfile = requiredString(record, "backendProfile") as BackendProfile; + if (!allowedBackends.has(backendProfile)) throw new AgentRunError("schema-invalid", `backendProfile ${backendProfile} is not supported in v0.1`, { httpStatus: 400 }); + const executionPolicy = validateExecutionPolicy(requiredRecord(record, "executionPolicy")); + return { + tenantId, + projectId: requiredString(record, "projectId"), + workspaceRef: requiredRecord(record, "workspaceRef") as CreateRunInput["workspaceRef"], + providerId: requiredString(record, "providerId"), + backendProfile, + executionPolicy, + traceSink: record.traceSink ?? null, + }; +} + +export function validateExecutionPolicy(record: JsonRecord): ExecutionPolicy { + const timeout = record.timeoutMs; + if (typeof timeout !== "number" || !Number.isFinite(timeout) || timeout <= 0) throw new AgentRunError("schema-invalid", "executionPolicy.timeoutMs must be a positive number", { httpStatus: 400 }); + const secretScope = asRecord(record.secretScope ?? {}, "executionPolicy.secretScope"); + if (secretScope.allowCredentialEcho !== undefined && secretScope.allowCredentialEcho !== false) throw new AgentRunError("tenant-policy-denied", "allowCredentialEcho must be false", { httpStatus: 403 }); + const providerCredentials = Array.isArray(secretScope.providerCredentials) ? secretScope.providerCredentials : []; + for (const credential of providerCredentials) { + const item = asRecord(credential, "providerCredential"); + const secretRef = asRecord(item.secretRef, "providerCredential.secretRef"); + if (typeof secretRef.name !== "string" || secretRef.name.length === 0) throw new AgentRunError("schema-invalid", "provider credential secretRef.name is required", { httpStatus: 400 }); + } + const secretScopeResult: ExecutionPolicy["secretScope"] = { allowCredentialEcho: false }; + if (providerCredentials.length > 0) secretScopeResult.providerCredentials = providerCredentials as NonNullable; + return { + sandbox: requiredString(record, "sandbox"), + approval: requiredString(record, "approval"), + timeoutMs: timeout, + network: requiredString(record, "network"), + secretScope: secretScopeResult, + }; +} + +export function validateCreateCommand(input: unknown): CreateCommandInput { + const record = asRecord(input, "command"); + const type = requiredString(record, "type"); + if (type !== "turn" && type !== "interrupt") throw new AgentRunError("schema-invalid", `command type ${type} is not supported`, { httpStatus: 400 }); + const payload = asRecord(record.payload ?? {}, "payload"); + const idempotencyKey = typeof record.idempotencyKey === "string" && record.idempotencyKey.trim().length > 0 ? record.idempotencyKey.trim() : undefined; + return { type, payload, ...(idempotencyKey ? { idempotencyKey } : {}) }; +} diff --git a/src/mgr/client.ts b/src/mgr/client.ts new file mode 100644 index 0000000..306dafb --- /dev/null +++ b/src/mgr/client.ts @@ -0,0 +1,32 @@ +import type { JsonRecord, JsonValue } from "../common/types.js"; +import { AgentRunError } from "../common/errors.js"; + +export class ManagerClient { + constructor(readonly baseUrl: string) {} + + async get(path: string): Promise { + return this.request("GET", path); + } + + async post(path: string, body: JsonValue): Promise { + return this.request("POST", path, body); + } + + async patch(path: string, body: JsonValue): Promise { + return this.request("PATCH", path, body); + } + + private async request(method: string, path: string, body?: JsonValue): Promise { + const init: RequestInit = { method }; + if (body !== undefined) { + init.headers = { "content-type": "application/json" }; + init.body = JSON.stringify(body); + } + const response = await fetch(new URL(path, this.baseUrl), init); + const text = await response.text(); + if (text.trim().length === 0) throw new AgentRunError("infra-failed", `manager returned empty response for ${method} ${path}`, { httpStatus: 502 }); + const envelope = JSON.parse(text) as JsonRecord; + if (envelope.ok !== true) throw new AgentRunError(typeof envelope.failureKind === "string" ? envelope.failureKind as never : "infra-failed", typeof envelope.message === "string" ? envelope.message : `manager request failed: ${method} ${path}`, { httpStatus: response.status, details: envelope }); + return envelope.data ?? null; + } +} diff --git a/src/mgr/main.ts b/src/mgr/main.ts new file mode 100644 index 0000000..876ad1c --- /dev/null +++ b/src/mgr/main.ts @@ -0,0 +1,6 @@ +import { startManagerServer } from "./server.js"; + +const port = Number(process.env.PORT ?? process.env.AGENTRUN_MGR_PORT ?? "8080"); +const host = process.env.HOST ?? "0.0.0.0"; +const started = await startManagerServer({ port, host }); +console.log(JSON.stringify({ ok: true, serviceId: "agentrun-mgr", baseUrl: started.baseUrl })); diff --git a/src/mgr/server.ts b/src/mgr/server.ts new file mode 100644 index 0000000..05845f8 --- /dev/null +++ b/src/mgr/server.ts @@ -0,0 +1,116 @@ +import type { Server } from "node:http"; +import { createServer } from "node:http"; +import type { AddressInfo } from "node:net"; +import type { AgentRunStore } from "./store.js"; +import { MemoryAgentRunStore } from "./store.js"; +import { AgentRunError, errorToJson } from "../common/errors.js"; +import { asRecord, validateCreateCommand, validateCreateRun } from "../common/validation.js"; +import type { ApiErrorBody, ApiOkBody, JsonRecord, JsonValue, RunEvent } from "../common/types.js"; + +export interface ManagerServerOptions { + store?: AgentRunStore; + port?: number; + host?: string; + sourceCommit?: string; +} + +export interface StartedManagerServer { + server: Server; + baseUrl: string; + store: AgentRunStore; +} + +export async function startManagerServer(options: ManagerServerOptions = {}): Promise { + const store = options.store ?? new MemoryAgentRunStore(); + const sourceCommit = options.sourceCommit ?? process.env.AGENTRUN_SOURCE_COMMIT ?? "unknown"; + const server = createServer(async (req, res) => { + const traceId = `trc_${Date.now().toString(36)}_${Math.random().toString(36).slice(2, 8)}`; + try { + const method = req.method ?? "GET"; + const url = new URL(req.url ?? "/", "http://agentrun.local"); + const data = await route({ method, url, body: await readBody(req), store, sourceCommit }); + writeJson(res, 200, { ok: true, data, traceId }); + } catch (error) { + const agentError = normalizeError(error); + writeJson(res, agentError.httpStatus, { ok: false, failureKind: agentError.failureKind, message: agentError.message, traceId, error: errorToJson(error) }); + } + }); + await new Promise((resolve) => server.listen(options.port ?? 0, options.host ?? "127.0.0.1", resolve)); + const address = server.address() as AddressInfo; + return { server, baseUrl: `http://${address.address}:${address.port}`, store }; +} + +async function readBody(req: import("node:http").IncomingMessage): Promise { + if (req.method === "GET" || req.method === "HEAD") return null; + const chunks: Buffer[] = []; + for await (const chunk of req) chunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk)); + const text = Buffer.concat(chunks).toString("utf8").trim(); + if (text.length === 0) return null; + return JSON.parse(text) as unknown; +} + +async function route({ method, url, body, store, sourceCommit }: { method: string; url: URL; body: unknown; store: AgentRunStore; sourceCommit: string }): Promise { + const path = url.pathname; + if (method === "GET" && (path === "/health" || path === "/health/live" || path === "/health/readiness")) { + return { serviceId: "agentrun-mgr", live: true, ready: true, database: { adapter: "memory-self-test", migrationReady: true }, sourceCommit, secretRefs: { valuesPrinted: false } }; + } + if (method === "GET" && path === "/api/v1/backends") return { items: store.backends() }; + if (method === "POST" && path === "/api/v1/runs") return store.createRun(validateCreateRun(body)) as unknown as JsonValue; + const runMatch = path.match(/^\/api\/v1\/runs\/([^/]+)$/u); + if (method === "GET" && runMatch) return store.getRun(runMatch[1] ?? "") as unknown as JsonValue; + const eventMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/events$/u); + if (method === "GET" && eventMatch) { + const afterSeq = integerQuery(url, "afterSeq", 0); + const limit = integerQuery(url, "limit", 100); + return { items: store.listEvents(eventMatch[1] ?? "", afterSeq, limit) as unknown as JsonValue }; + } + const commandCreateMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/commands$/u); + if (method === "POST" && commandCreateMatch) return store.createCommand(commandCreateMatch[1] ?? "", validateCreateCommand(body)) as unknown as JsonValue; + if (method === "GET" && commandCreateMatch) return { items: store.listCommands(commandCreateMatch[1] ?? "", integerQuery(url, "afterSeq", 0), integerQuery(url, "limit", 20)) as unknown as JsonValue }; + const commandShowMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/commands\/([^/]+)$/u); + if (method === "GET" && commandShowMatch) return store.getCommand(commandShowMatch[2] ?? "") as unknown as JsonValue; + if (method === "POST" && path === "/api/v1/runners/register") return store.registerRunner(asRecord(body ?? {}, "runner")) as unknown as JsonValue; + const claimMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/claim$/u); + if (method === "POST" && claimMatch) { + const record = asRecord(body, "claim"); + const runnerId = typeof record.runnerId === "string" ? record.runnerId : ""; + if (runnerId.length === 0) throw new AgentRunError("schema-invalid", "runnerId is required", { httpStatus: 400 }); + return store.claimRun(claimMatch[1] ?? "", runnerId, numberField(record, "leaseMs", 60_000)) as unknown as JsonValue; + } + const eventsAppendMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/events$/u); + if (method === "POST" && eventsAppendMatch) { + const record = asRecord(body, "event"); + const type = typeof record.type === "string" ? record.type as RunEvent["type"] : "backend_status"; + return store.appendEvent(eventsAppendMatch[1] ?? "", type, asRecord(record.payload ?? {}, "event.payload")) as unknown as JsonValue; + } + const statusMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/status$/u); + if (method === "PATCH" && statusMatch) { + const record = asRecord(body, "status"); + const terminalStatus = record.terminalStatus === "completed" || record.terminalStatus === "failed" || record.terminalStatus === "blocked" || record.terminalStatus === "cancelled" ? record.terminalStatus : "failed"; + return store.finishRun(statusMatch[1] ?? "", { terminalStatus, failureKind: typeof record.failureKind === "string" ? record.failureKind as never : null, failureMessage: typeof record.failureMessage === "string" ? record.failureMessage : null }) as unknown as JsonValue; + } + const ackMatch = path.match(/^\/api\/v1\/commands\/([^/]+)\/ack$/u); + if (method === "POST" && ackMatch) return store.ackCommand(ackMatch[1] ?? "") as unknown as JsonValue; + throw new AgentRunError("schema-invalid", `unsupported route ${method} ${path}`, { httpStatus: 404 }); +} + +function integerQuery(url: URL, key: string, fallback: number): number { + const value = Number(url.searchParams.get(key)); + return Number.isInteger(value) && value >= 0 ? value : fallback; +} + +function numberField(record: JsonRecord, key: string, fallback: number): number { + const value = record[key]; + return typeof value === "number" && Number.isFinite(value) ? value : fallback; +} + +function normalizeError(error: unknown): AgentRunError { + if (error instanceof AgentRunError) return error; + return new AgentRunError("infra-failed", error instanceof Error ? error.message : String(error), { httpStatus: 500 }); +} + +function writeJson(res: import("node:http").ServerResponse, statusCode: number, body: ApiOkBody | ApiErrorBody): void { + const text = `${JSON.stringify(body)}\n`; + res.writeHead(statusCode, { "content-type": "application/json; charset=utf-8", "content-length": Buffer.byteLength(text) }); + res.end(text); +} diff --git a/src/mgr/store.ts b/src/mgr/store.ts new file mode 100644 index 0000000..44fa01a --- /dev/null +++ b/src/mgr/store.ts @@ -0,0 +1,138 @@ +import type { BackendProfile, BackendTurnResult, CommandRecord, CreateCommandInput, CreateRunInput, FailureKind, JsonRecord, RunEvent, RunnerRecord, RunRecord, TerminalStatus } from "../common/types.js"; +import { AgentRunError } from "../common/errors.js"; +import { newId, nowIso, stableHash } from "../common/validation.js"; +import { redactJson } from "../common/redaction.js"; + +export interface AgentRunStore { + createRun(input: CreateRunInput): RunRecord; + getRun(runId: string): RunRecord; + listEvents(runId: string, afterSeq: number, limit: number): RunEvent[]; + createCommand(runId: string, input: CreateCommandInput): CommandRecord; + getCommand(commandId: string): CommandRecord; + listCommands(runId: string, afterSeq: number, limit: number): CommandRecord[]; + registerRunner(input: Partial): RunnerRecord; + claimRun(runId: string, runnerId: string, leaseMs: number): RunRecord; + heartbeat(runId: string, runnerId: string, leaseMs: number): RunRecord; + ackCommand(commandId: string): CommandRecord; + appendEvent(runId: string, type: RunEvent["type"], payload: JsonRecord): RunEvent; + finishRun(runId: string, result: Pick): RunRecord; + backends(): JsonRecord[]; +} + +export class MemoryAgentRunStore implements AgentRunStore { + private readonly runs = new Map(); + private readonly commands = new Map(); + private readonly eventsByRun = new Map(); + private readonly runners = new Map(); + + createRun(input: CreateRunInput): RunRecord { + const at = nowIso(); + const run: RunRecord = { ...input, id: newId("run"), status: "pending", terminalStatus: null, failureKind: null, failureMessage: null, createdAt: at, updatedAt: at, claimedBy: null, leaseExpiresAt: null }; + this.runs.set(run.id, run); + this.eventsByRun.set(run.id, []); + this.appendEvent(run.id, "backend_status", { phase: "run-created", backendProfile: run.backendProfile }); + return run; + } + + getRun(runId: string): RunRecord { + const run = this.runs.get(runId); + if (!run) throw new AgentRunError("schema-invalid", `run ${runId} was not found`, { httpStatus: 404 }); + return run; + } + + listEvents(runId: string, afterSeq: number, limit: number): RunEvent[] { + this.getRun(runId); + return (this.eventsByRun.get(runId) ?? []).filter((event) => event.seq > afterSeq).slice(0, Math.max(1, Math.min(limit, 500))); + } + + createCommand(runId: string, input: CreateCommandInput): CommandRecord { + this.getRun(runId); + const payloadHash = stableHash(input.payload); + if (input.idempotencyKey) { + const existing = Array.from(this.commands.values()).find((command) => command.runId === runId && command.idempotencyKey === input.idempotencyKey); + if (existing) { + if (existing.payloadHash !== payloadHash) throw new AgentRunError("schema-invalid", "idempotency key reused with different payload", { httpStatus: 409 }); + return existing; + } + } + const at = nowIso(); + const seq = Array.from(this.commands.values()).filter((command) => command.runId === runId).length + 1; + const command: CommandRecord = { ...input, id: newId("cmd"), runId, seq, state: "pending", payloadHash, createdAt: at, updatedAt: at, acknowledgedAt: null }; + this.commands.set(command.id, command); + this.appendEvent(runId, "backend_status", { phase: "command-created", commandId: command.id, commandType: command.type }); + return command; + } + + getCommand(commandId: string): CommandRecord { + const command = this.commands.get(commandId); + if (!command) throw new AgentRunError("schema-invalid", `command ${commandId} was not found`, { httpStatus: 404 }); + return command; + } + + listCommands(runId: string, afterSeq: number, limit: number): CommandRecord[] { + this.getRun(runId); + return Array.from(this.commands.values()).filter((command) => command.runId === runId && command.seq > afterSeq).slice(0, Math.max(1, Math.min(limit, 100))); + } + + registerRunner(input: Partial): RunnerRecord { + const at = nowIso(); + const runner: RunnerRecord = { id: input.id ?? newId("runner"), registeredAt: at, heartbeatAt: at, ...input }; + this.runners.set(runner.id, runner); + return runner; + } + + claimRun(runId: string, runnerId: string, leaseMs: number): RunRecord { + const run = this.getRun(runId); + if (run.claimedBy && run.claimedBy !== runnerId && run.status !== "completed" && run.status !== "failed" && run.status !== "cancelled") throw new AgentRunError("runner-lease-conflict", `run ${runId} is already claimed`, { httpStatus: 409 }); + const next = this.updateRun(runId, { status: "claimed", claimedBy: runnerId, leaseExpiresAt: new Date(Date.now() + leaseMs).toISOString() }); + this.appendEvent(runId, "backend_status", { phase: "run-claimed", runnerId }); + return next; + } + + heartbeat(runId: string, runnerId: string, leaseMs: number): RunRecord { + const run = this.getRun(runId); + if (run.claimedBy !== runnerId) throw new AgentRunError("runner-lease-conflict", `run ${runId} is not claimed by ${runnerId}`, { httpStatus: 409 }); + return this.updateRun(runId, { leaseExpiresAt: new Date(Date.now() + leaseMs).toISOString() }); + } + + ackCommand(commandId: string): CommandRecord { + const command = this.getCommand(commandId); + const next = { ...command, state: "acknowledged" as const, acknowledgedAt: nowIso(), updatedAt: nowIso() }; + this.commands.set(commandId, next); + return next; + } + + appendEvent(runId: string, type: RunEvent["type"], payload: JsonRecord): RunEvent { + this.getRun(runId); + const events = this.eventsByRun.get(runId) ?? []; + const event: RunEvent = { id: newId("evt"), runId, seq: events.length + 1, type, payload: redactJson(payload), createdAt: nowIso() }; + events.push(event); + this.eventsByRun.set(runId, events); + return event; + } + + finishRun(runId: string, result: Pick): RunRecord { + const status = statusFromTerminal(result.terminalStatus); + const next = this.updateRun(runId, { status, terminalStatus: result.terminalStatus, failureKind: result.failureKind, failureMessage: result.failureMessage }); + this.appendEvent(runId, "terminal_status", { terminalStatus: result.terminalStatus, failureKind: result.failureKind, message: result.failureMessage }); + return next; + } + + backends(): JsonRecord[] { + return [{ profile: "codex" satisfies BackendProfile, protocol: "codex-app-server-jsonrpc-stdio", transport: "stdio", command: "codex app-server --listen stdio://", status: "registered" }]; + } + + private updateRun(runId: string, patch: Partial): RunRecord { + const run = this.getRun(runId); + const next = { ...run, ...patch, updatedAt: nowIso() }; + this.runs.set(runId, next); + return next; + } +} + +function statusFromTerminal(terminalStatus: TerminalStatus): RunRecord["status"] { + if (terminalStatus === "completed") return "completed"; + if (terminalStatus === "cancelled") return "cancelled"; + if (terminalStatus === "blocked") return "blocked"; + return "failed"; +} diff --git a/src/runner/main.ts b/src/runner/main.ts new file mode 100644 index 0000000..e2eecda --- /dev/null +++ b/src/runner/main.ts @@ -0,0 +1,19 @@ +import { runOnce, type RunnerOnceOptions } from "./run-once.js"; + +const managerUrl = process.env.AGENTRUN_MGR_URL; +const runId = process.env.AGENTRUN_RUN_ID; +if (!managerUrl || !runId) { + console.log(JSON.stringify({ ok: false, failureKind: "schema-invalid", message: "AGENTRUN_MGR_URL and AGENTRUN_RUN_ID are required" })); + process.exit(2); +} + +const options: RunnerOnceOptions = { + managerUrl, + runId, +}; +if (process.env.AGENTRUN_RUNNER_ID) options.runnerId = process.env.AGENTRUN_RUNNER_ID; +if (process.env.AGENTRUN_CODEX_COMMAND) options.codexCommand = process.env.AGENTRUN_CODEX_COMMAND; +if (process.env.AGENTRUN_CODEX_ARGS) options.codexArgs = JSON.parse(process.env.AGENTRUN_CODEX_ARGS) as string[]; +if (process.env.CODEX_HOME) options.codexHome = process.env.CODEX_HOME; +const result = await runOnce(options); +console.log(JSON.stringify({ ok: true, data: result })); diff --git a/src/runner/run-once.ts b/src/runner/run-once.ts new file mode 100644 index 0000000..c383808 --- /dev/null +++ b/src/runner/run-once.ts @@ -0,0 +1,35 @@ +import { ManagerClient } from "../mgr/client.js"; +import { runBackendTurn, type BackendAdapterOptions } from "../backend/adapter.js"; +import type { CommandRecord, JsonRecord, RunRecord, RunnerRecord } from "../common/types.js"; + +export interface RunnerOnceOptions extends BackendAdapterOptions { + managerUrl: string; + runId: string; + runnerId?: string; + attemptId?: string; + leaseMs?: number; +} + +export async function runOnce(options: RunnerOnceOptions): Promise { + const client = new ManagerClient(options.managerUrl); + const runner = await client.post("/api/v1/runners/register", { + id: options.runnerId ?? undefined, + runId: options.runId, + attemptId: options.attemptId ?? `attempt_${Date.now().toString(36)}`, + backendProfile: "codex", + placement: "host-process", + sourceCommit: process.env.AGENTRUN_SOURCE_COMMIT ?? "unknown", + } as JsonRecord) as unknown as RunnerRecord; + const claimed = await client.post(`/api/v1/runs/${options.runId}/claim`, { runnerId: runner.id, leaseMs: options.leaseMs ?? 60_000 }) as unknown as RunRecord; + const commandsResponse = await client.get(`/api/v1/runs/${options.runId}/commands?afterSeq=0&limit=20`) as { items?: CommandRecord[] }; + const command = commandsResponse.items?.find((item) => item.state === "pending" && item.type === "turn"); + if (!command) { + await client.patch(`/api/v1/runs/${options.runId}/status`, { terminalStatus: "blocked", failureKind: "schema-invalid", failureMessage: "no pending turn command" }); + return { runner, claimed, terminalStatus: "blocked", failureKind: "schema-invalid" }; + } + await client.post(`/api/v1/commands/${command.id}/ack`, {}); + const result = await runBackendTurn(claimed, command, options); + for (const event of result.events) await client.post(`/api/v1/runs/${options.runId}/events`, event as unknown as JsonRecord); + const finalRun = await client.patch(`/api/v1/runs/${options.runId}/status`, { terminalStatus: result.terminalStatus, failureKind: result.failureKind, failureMessage: result.failureMessage }) as unknown as RunRecord; + return { runner, commandId: command.id, terminalStatus: result.terminalStatus, failureKind: result.failureKind, run: finalRun } as JsonRecord; +} diff --git a/src/selftest/fake-codex-app-server.ts b/src/selftest/fake-codex-app-server.ts new file mode 100644 index 0000000..316a9b7 --- /dev/null +++ b/src/selftest/fake-codex-app-server.ts @@ -0,0 +1,48 @@ +import * as readline from "node:readline"; + +const rl = readline.createInterface({ input: process.stdin, crlfDelay: Infinity }); +let threadCounter = 0; +let turnCounter = 0; + +for await (const line of rl) { + const trimmed = String(line).trim(); + if (trimmed.length === 0) continue; + const message = JSON.parse(trimmed) as { id?: number; method?: string; params?: Record }; + if (message.method === "initialize") { + respond(message.id, { serverInfo: { name: "fake-codex-app-server", version: "self-test" } }); + continue; + } + if (message.method === "thread/start") { + threadCounter += 1; + const thread = { id: `thread_selftest_${threadCounter}` }; + notify("thread/started", { thread }); + respond(message.id, { thread }); + continue; + } + if (message.method === "thread/resume") { + const thread = { id: String(message.params?.threadId ?? "thread_selftest_resumed") }; + notify("thread/started", { thread }); + respond(message.id, { thread }); + continue; + } + if (message.method === "turn/start") { + turnCounter += 1; + const turn = { id: `turn_selftest_${turnCounter}`, status: "completed" }; + notify("turn/started", { turn }); + notify("item/agentMessage/delta", { itemId: "msg_selftest", delta: "fake codex stdio reply" }); + notify("item/commandExecution/outputDelta", { itemId: "cmd_selftest", delta: "Authorization: Bearer test-token\n" }); + notify("turn/completed", { turn }); + respond(message.id, { turn }); + continue; + } + respond(message.id, null, { code: -32601, message: `unsupported fake method ${message.method ?? "unknown"}` }); +} + +function respond(id: number | undefined, result: unknown, error?: unknown): void { + if (id === undefined) return; + process.stdout.write(`${JSON.stringify(error ? { id, error } : { id, result })}\n`); +} + +function notify(method: string, params: unknown): void { + process.stdout.write(`${JSON.stringify({ method, params })}\n`); +} diff --git a/src/selftest/run.ts b/src/selftest/run.ts new file mode 100644 index 0000000..655fd90 --- /dev/null +++ b/src/selftest/run.ts @@ -0,0 +1,65 @@ +import { mkdtemp, mkdir, writeFile, rm } from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; +import { fileURLToPath } from "node:url"; +import assert from "node:assert/strict"; +import { startManagerServer } from "../mgr/server.js"; +import { ManagerClient } from "../mgr/client.js"; +import { runOnce } from "../runner/run-once.js"; +import { redactText } from "../common/redaction.js"; + +const root = path.resolve(path.dirname(fileURLToPath(import.meta.url)), "../.."); +const tmp = await mkdtemp(path.join(os.tmpdir(), "agentrun-selftest-")); + +try { + const codexHome = path.join(tmp, "codex-home"); + const workspace = path.join(tmp, "workspace"); + await mkdir(codexHome, { recursive: true }); + await mkdir(workspace, { recursive: true }); + await writeFile(path.join(codexHome, "auth.json"), JSON.stringify({ token: "test-token-material" })); + await writeFile(path.join(codexHome, "config.toml"), "model = \"gpt-test\"\n"); + await writeFile(path.join(workspace, "README.md"), "self-test workspace\n"); + + assert.equal(redactText("Authorization: Bearer abc123"), "Authorization: Bearer REDACTED"); + + const server = await startManagerServer({ port: 0, host: "127.0.0.1", sourceCommit: "self-test" }); + try { + const client = new ManagerClient(server.baseUrl); + const health = await client.get("/health/readiness") as { database?: { adapter?: string } }; + assert.equal(health.database?.adapter, "memory-self-test"); + const run = await client.post("/api/v1/runs", { + tenantId: "unidesk", + projectId: "pikasTech/unidesk", + workspaceRef: { kind: "host-path", path: workspace }, + providerId: "G14", + backendProfile: "codex", + executionPolicy: { + sandbox: "workspace-write", + approval: "never", + timeoutMs: 15_000, + network: "default", + secretScope: { allowCredentialEcho: false, providerCredentials: [{ profile: "codex", secretRef: { name: "agentrun-v01-provider-codex", keys: ["auth.json", "config.toml"], mountPath: codexHome } }] }, + }, + traceSink: null, + }) as { id: string }; + const command = await client.post(`/api/v1/runs/${run.id}/commands`, { type: "turn", payload: { prompt: "hello" }, idempotencyKey: "selftest-turn" }) as { id: string }; + const duplicate = await client.post(`/api/v1/runs/${run.id}/commands`, { type: "turn", payload: { prompt: "hello" }, idempotencyKey: "selftest-turn" }) as { id: string }; + assert.equal(duplicate.id, command.id); + const fakePath = path.join(root, "src/selftest/fake-codex-app-server.ts"); + const fakeCommand = process.env.AGENTRUN_SELFTEST_CODEX_COMMAND ?? process.execPath; + const fakeArgs = process.env.AGENTRUN_SELFTEST_CODEX_ARGS ? JSON.parse(process.env.AGENTRUN_SELFTEST_CODEX_ARGS) as string[] : [fakePath]; + const result = await runOnce({ managerUrl: server.baseUrl, runId: run.id, codexCommand: fakeCommand, codexArgs: fakeArgs, codexHome, env: { CODEX_HOME: codexHome } }); + assert.equal(result.terminalStatus, "completed"); + const events = await client.get(`/api/v1/runs/${run.id}/events?afterSeq=0&limit=100`) as { items?: Array<{ type: string; payload: unknown }> }; + assert.ok(events.items?.some((event) => event.type === "assistant_message")); + assert.equal(JSON.stringify(events).includes("test-token-material"), false); + assert.equal(JSON.stringify(events).includes("Bearer test-token"), false); + const finalRun = await client.get(`/api/v1/runs/${run.id}`) as { terminalStatus?: string }; + assert.equal(finalRun.terminalStatus, "completed"); + console.log(JSON.stringify({ ok: true, tests: ["manager-memory-lifecycle", "codex-stdio-fake-turn", "redaction"], runId: run.id })); + } finally { + await new Promise((resolve) => server.server.close(() => resolve())); + } +} finally { + await rm(tmp, { recursive: true, force: true }); +} diff --git a/tsconfig.json b/tsconfig.json new file mode 100644 index 0000000..faa3dd2 --- /dev/null +++ b/tsconfig.json @@ -0,0 +1,17 @@ +{ + "compilerOptions": { + "target": "ES2022", + "module": "NodeNext", + "moduleResolution": "NodeNext", + "lib": ["ES2022", "DOM"], + "strict": true, + "noImplicitReturns": true, + "noFallthroughCasesInSwitch": true, + "exactOptionalPropertyTypes": true, + "skipLibCheck": true, + "types": ["node"], + "rootDir": ".", + "noEmit": true + }, + "include": ["scripts/**/*.ts", "src/**/*.ts"] +}