feat: add v0.1 runtime skeleton

This commit is contained in:
Codex
2026-05-29 10:52:41 +08:00
parent 4b33e67484
commit 5deb9fa7fd
20 changed files with 1238 additions and 0 deletions
+1
View File
@@ -1,6 +1,7 @@
.state/
logs/
node_modules/
package-lock.json
dist/
build/
coverage/
+57
View File
@@ -0,0 +1,57 @@
{
"lane": "v0.1",
"runtimeNamespace": "agentrun-v01",
"gitopsBranch": "v0.1-gitops",
"runtimePath": "deploy/gitops/g14/runtime-v01",
"services": [
{
"id": "agentrun-mgr",
"component": "manager",
"enabled": true,
"replicas": 1,
"ports": [{ "name": "http", "containerPort": 8080 }],
"health": {
"livePath": "/health/live",
"readinessPath": "/health/readiness"
},
"env": [
{ "name": "AGENTRUN_LANE", "value": "v0.1" },
{ "name": "AGENTRUN_RUNTIME_NAMESPACE", "value": "agentrun-v01" },
{ "name": "DATABASE_URL", "secretRef": { "name": "agentrun-v01-mgr-db", "key": "DATABASE_URL" } }
],
"resources": {
"requests": { "cpu": "100m", "memory": "256Mi" },
"limits": { "cpu": "800m", "memory": "1Gi" }
}
},
{
"id": "agentrun-runner",
"component": "runner-job-template",
"enabled": true,
"serviceAccount": "agentrun-v01-runner",
"secretMounts": [
{
"name": "codex-home",
"secretRef": { "name": "agentrun-v01-provider-codex", "keys": ["auth.json", "config.toml"] },
"mountPath": "/home/agentrun/.codex",
"readOnly": true
}
],
"env": [
{ "name": "HOME", "value": "/home/agentrun" },
{ "name": "AGENTRUN_LANE", "value": "v0.1" }
],
"resources": {
"requests": { "cpu": "250m", "memory": "512Mi" },
"limits": { "cpu": "2", "memory": "4Gi" }
}
}
],
"postgres": {
"statefulSet": "agentrun-v01-postgres",
"service": "agentrun-v01-postgres",
"database": "agentrun_v01",
"secretRef": { "name": "agentrun-v01-postgres" },
"dataPvc": "agentrun-v01-postgres-data"
}
}
+16
View File
@@ -0,0 +1,16 @@
{
"name": "agentrun",
"version": "0.1.0",
"private": true,
"type": "module",
"scripts": {
"check": "tsc --noEmit",
"self-test": "bun run src/selftest/run.ts",
"test": "bun run src/selftest/run.ts",
"cli": "bun scripts/agentrun-cli.ts"
},
"devDependencies": {
"@types/node": "^22.10.0",
"typescript": "^5.8.3"
}
}
+4
View File
@@ -0,0 +1,4 @@
#!/usr/bin/env bun
import { runCli } from "./src/cli.js";
await runCli(process.argv.slice(2));
+134
View File
@@ -0,0 +1,134 @@
import { readFile } from "node:fs/promises";
import { startManagerServer } from "../../src/mgr/server.js";
import { ManagerClient } from "../../src/mgr/client.js";
import { runOnce } from "../../src/runner/run-once.js";
import type { JsonRecord, JsonValue } from "../../src/common/types.js";
import { AgentRunError, errorToJson } from "../../src/common/errors.js";
import type { RunnerOnceOptions } from "../../src/runner/run-once.js";
interface ParsedArgs {
positional: string[];
flags: Map<string, string | boolean>;
}
export async function runCli(argv: string[]): Promise<void> {
try {
const result = await dispatch(parseArgs(argv));
print({ ok: true, data: result });
} catch (error) {
const status = error instanceof AgentRunError ? error.httpStatus : 1;
print({ ok: false, ...(error instanceof AgentRunError ? { failureKind: error.failureKind, message: error.message } : { failureKind: "infra-failed", message: error instanceof Error ? error.message : String(error) }), error: errorToJson(error) });
process.exitCode = status === 0 ? 1 : status > 255 ? 1 : status;
}
}
async function dispatch(args: ParsedArgs): Promise<JsonValue> {
const [group, command, id] = args.positional;
if (!group || group === "help") return help();
if (group === "server" && command === "start") return startServer(args);
if (group === "server" && command === "status") return client(args).get("/health/readiness");
if (group === "backends" && command === "list") return client(args).get("/api/v1/backends");
if (group === "runs" && command === "create") return client(args).post("/api/v1/runs", await jsonFile(args));
if (group === "runs" && command === "show" && id) return client(args).get(`/api/v1/runs/${encodeURIComponent(id)}`);
if (group === "runs" && command === "events" && id) return client(args).get(`/api/v1/runs/${encodeURIComponent(id)}/events?afterSeq=${flag(args, "after-seq", "0")}&limit=${flag(args, "limit", "100")}`);
if (group === "commands" && command === "create" && id) {
const body = await jsonFile(args);
if (!body.type) body.type = flag(args, "type", "turn");
const idempotencyKey = optionalFlag(args, "idempotency-key");
if (idempotencyKey) body.idempotencyKey = idempotencyKey;
return client(args).post(`/api/v1/runs/${encodeURIComponent(id)}/commands`, body);
}
if (group === "commands" && command === "show" && id) {
const runId = flag(args, "run-id", "");
if (!runId) throw new AgentRunError("schema-invalid", "commands show requires --run-id", { httpStatus: 2 });
return client(args).get(`/api/v1/runs/${encodeURIComponent(runId)}/commands/${encodeURIComponent(id)}`);
}
if (group === "runner" && command === "start") {
const runId = flag(args, "run-id", "");
if (!runId) throw new AgentRunError("schema-invalid", "runner start requires --run-id", { httpStatus: 2 });
const options: RunnerOnceOptions = {
managerUrl: managerUrl(args),
runId,
};
const runnerId = optionalFlag(args, "runner-id");
const codexCommand = optionalFlag(args, "codex-command");
const codexHome = optionalFlag(args, "codex-home") ?? process.env.CODEX_HOME;
if (runnerId) options.runnerId = runnerId;
if (codexCommand) options.codexCommand = codexCommand;
if (codexHome) options.codexHome = codexHome;
return runOnce(options) as unknown as JsonValue;
}
throw new AgentRunError("schema-invalid", `unsupported command: ${args.positional.join(" ")}`, { httpStatus: 2 });
}
async function startServer(args: ParsedArgs): Promise<JsonRecord> {
const port = Number(flag(args, "port", "8080"));
const host = flag(args, "host", "0.0.0.0");
const started = await startManagerServer({ port, host });
return { serviceId: "agentrun-mgr", baseUrl: started.baseUrl, pid: process.pid, note: "foreground process; use Kubernetes/Tekton for v0.1 runtime" };
}
function client(args: ParsedArgs): ManagerClient {
return new ManagerClient(managerUrl(args));
}
function managerUrl(args: ParsedArgs): string {
return optionalFlag(args, "manager-url") ?? process.env.AGENTRUN_MGR_URL ?? "http://127.0.0.1:8080";
}
async function jsonFile(args: ParsedArgs): Promise<JsonRecord> {
const file = optionalFlag(args, "json-file");
if (!file) throw new AgentRunError("schema-invalid", "--json-file is required", { httpStatus: 2 });
const value = JSON.parse(await readFile(file, "utf8")) as unknown;
if (typeof value === "object" && value !== null && !Array.isArray(value)) return value as JsonRecord;
throw new AgentRunError("schema-invalid", "json file must contain an object", { httpStatus: 2 });
}
function parseArgs(argv: string[]): ParsedArgs {
const positional: string[] = [];
const flags = new Map<string, string | boolean>();
for (let index = 0; index < argv.length; index += 1) {
const item = argv[index] ?? "";
if (!item.startsWith("--")) {
positional.push(item);
continue;
}
const key = item.slice(2);
const next = argv[index + 1];
if (next === undefined || next.startsWith("--")) flags.set(key, true);
else {
flags.set(key, next);
index += 1;
}
}
return { positional, flags };
}
function flag(args: ParsedArgs, name: string, fallback: string): string {
const value = args.flags.get(name);
return typeof value === "string" ? value : fallback;
}
function optionalFlag(args: ParsedArgs, name: string): string | null {
const value = args.flags.get(name);
return typeof value === "string" && value.length > 0 ? value : null;
}
function help(): JsonRecord {
return {
commands: [
"runs create --json-file <run.json>",
"runs show <runId>",
"runs events <runId> --after-seq <n> --limit <n>",
"commands create <runId> --type turn --json-file <payload.json>",
"commands show <commandId> --run-id <runId>",
"runner start --run-id <runId>",
"backends list",
"server start|status",
],
};
}
function print(value: JsonRecord): void {
process.stdout.write(`${JSON.stringify(value)}\n`);
}
+29
View File
@@ -0,0 +1,29 @@
import type { BackendTurnResult, CommandRecord, RunRecord } from "../common/types.js";
import { runCodexStdioTurn, type CodexStdioTurnOptions } from "./codex-stdio.js";
export interface BackendAdapterOptions {
codexCommand?: string;
codexArgs?: string[];
codexHome?: string;
env?: NodeJS.ProcessEnv;
}
export async function runBackendTurn(run: RunRecord, command: CommandRecord, options: BackendAdapterOptions = {}): Promise<BackendTurnResult> {
if (run.backendProfile !== "codex") {
return { terminalStatus: "failed", failureKind: "backend-failed", failureMessage: `unsupported backendProfile ${run.backendProfile}`, events: [{ type: "error", payload: { failureKind: "backend-failed", backendProfile: run.backendProfile } }] };
}
const prompt = typeof command.payload.prompt === "string" ? command.payload.prompt : JSON.stringify(command.payload);
const turnOptions: CodexStdioTurnOptions = {
prompt,
cwd: typeof run.workspaceRef.path === "string" ? run.workspaceRef.path : process.cwd(),
approvalPolicy: run.executionPolicy.approval,
sandbox: run.executionPolicy.sandbox,
timeoutMs: run.executionPolicy.timeoutMs,
};
if (typeof command.payload.model === "string") turnOptions.model = command.payload.model;
if (options.codexCommand) turnOptions.command = options.codexCommand;
if (options.codexArgs) turnOptions.args = options.codexArgs;
if (options.env) turnOptions.env = options.env;
if (options.codexHome) turnOptions.codexHome = options.codexHome;
return runCodexStdioTurn(turnOptions);
}
+246
View File
@@ -0,0 +1,246 @@
import { spawn, type ChildProcessWithoutNullStreams } from "node:child_process";
import { accessSync, constants as fsConstants } from "node:fs";
import * as readline from "node:readline";
import type { BackendEvent, BackendTurnResult, FailureKind, JsonRecord, JsonValue, TerminalStatus } from "../common/types.js";
import { redactJson, redactText } from "../common/redaction.js";
export interface CodexStdioTurnOptions {
prompt: string;
cwd: string;
model?: string;
approvalPolicy: string;
sandbox: string;
timeoutMs: number;
command?: string;
args?: string[];
env?: NodeJS.ProcessEnv;
codexHome?: string;
}
interface PendingRequest {
resolve: (value: unknown) => void;
reject: (error: Error) => void;
}
export class CodexStdioClient {
private readonly child: ChildProcessWithoutNullStreams;
private readonly pending = new Map<number, PendingRequest>();
private readonly stderrChunks: Buffer[] = [];
private nextId = 1;
private closed = false;
readonly closedPromise: Promise<{ code: number | null; signal: string | null; stderrTail: string }>;
private closeResolve!: (value: { code: number | null; signal: string | null; stderrTail: string }) => void;
constructor(options: { command?: string; args?: string[]; cwd: string; env?: NodeJS.ProcessEnv; onNotification: (message: JsonRecord) => void }) {
this.closedPromise = new Promise((resolve) => { this.closeResolve = resolve; });
this.child = spawn(options.command ?? "codex", options.args ?? ["app-server", "--listen", "stdio://"], {
cwd: options.cwd,
env: options.env ?? process.env,
stdio: "pipe",
});
this.child.stderr.on("data", (chunk: Buffer) => {
this.stderrChunks.push(chunk);
while (Buffer.concat(this.stderrChunks).length > 64_000) this.stderrChunks.shift();
});
const rl = readline.createInterface({ input: this.child.stdout, crlfDelay: Infinity });
void this.readLines(rl, options.onNotification);
this.child.on("close", (code, signal) => this.handleClose(code, signal));
this.child.on("error", (error) => this.handleClose(127, error.message));
}
request(method: string, params: JsonRecord): Promise<unknown> {
if (this.closed) return Promise.reject(new Error("codex app-server is closed"));
const id = this.nextId++;
const message = { id, method, params };
return new Promise((resolve, reject) => {
this.pending.set(id, { resolve, reject });
this.child.stdin.write(`${JSON.stringify(message)}\n`);
});
}
notify(method: string, params: JsonRecord = {}): void {
if (!this.closed) this.child.stdin.write(`${JSON.stringify({ method, params })}\n`);
}
stop(): void {
if (this.closed) return;
this.child.kill("SIGTERM");
setTimeout(() => {
if (!this.closed) this.child.kill("SIGKILL");
}, 1500).unref?.();
}
private async readLines(rl: readline.Interface, onNotification: (message: JsonRecord) => void): Promise<void> {
try {
for await (const line of rl) {
const trimmed = String(line).trim();
if (trimmed.length === 0) continue;
const message = JSON.parse(trimmed) as JsonRecord;
const id = typeof message.id === "number" ? message.id : null;
const method = typeof message.method === "string" ? message.method : null;
if (id !== null && method === null) {
const pending = this.pending.get(id);
if (!pending) continue;
this.pending.delete(id);
if (message.error !== undefined) pending.reject(new Error(JSON.stringify(redactJson(message.error))));
else pending.resolve(message.result);
continue;
}
if (id !== null && method !== null) {
this.handleServerRequest(id, method);
continue;
}
if (method !== null) onNotification(message);
}
} catch (error) {
this.rejectAll(error instanceof Error ? error : new Error(String(error)));
}
}
private handleServerRequest(id: number, method: string): void {
if (method === "item/commandExecution/requestApproval" || method === "item/fileChange/requestApproval") {
this.child.stdin.write(`${JSON.stringify({ id, result: { decision: "decline" } })}\n`);
return;
}
this.child.stdin.write(`${JSON.stringify({ id, error: { code: -32601, message: `Unsupported client-side request: ${method}` } })}\n`);
}
private rejectAll(error: Error): void {
for (const pending of this.pending.values()) pending.reject(error);
this.pending.clear();
}
private handleClose(code: number | null, signal: string | null): void {
if (this.closed) return;
this.closed = true;
const stderrTail = redactText(Buffer.concat(this.stderrChunks).toString("utf8").slice(-8000));
this.rejectAll(new Error(`codex app-server closed code=${code} signal=${signal}`));
this.closeResolve({ code, signal, stderrTail });
}
}
export async function runCodexStdioTurn(options: CodexStdioTurnOptions): Promise<BackendTurnResult> {
const secretFailure = codexHomeReadiness(options.codexHome ?? options.env?.CODEX_HOME ?? `${options.env?.HOME ?? process.env.HOME ?? ""}/.codex`);
if (secretFailure) return secretFailure;
const events: BackendEvent[] = [{ type: "backend_status", payload: { phase: "codex-app-server-starting", protocol: "codex-app-server-jsonrpc-stdio" } }];
let assistantText = "";
let threadId: string | undefined;
let turnId: string | undefined;
let terminal: { status: TerminalStatus; failureKind: FailureKind | null; message: string | null } | null = null;
let terminalResolve!: () => void;
const terminalPromise = new Promise<void>((resolve) => { terminalResolve = resolve; });
const clientOptions: ConstructorParameters<typeof CodexStdioClient>[0] = {
cwd: options.cwd,
env: childEnv(options),
onNotification: (message) => {
const normalized = normalizeCodexNotification(message);
if (normalized.threadId) threadId = normalized.threadId;
if (normalized.turnId) turnId = normalized.turnId;
if (normalized.assistantDelta) assistantText += normalized.assistantDelta;
events.push(...normalized.events);
if (normalized.terminal) {
terminal = normalized.terminal;
terminalResolve();
}
},
};
if (options.command) clientOptions.command = options.command;
if (options.args) clientOptions.args = options.args;
const client = new CodexStdioClient(clientOptions);
const timeout = setTimeout(() => {
if (!terminal) {
terminal = { status: "failed", failureKind: "backend-timeout", message: `codex stdio turn timed out after ${options.timeoutMs}ms` };
events.push({ type: "error", payload: terminal });
client.stop();
terminalResolve();
}
}, options.timeoutMs);
try {
await client.request("initialize", { clientInfo: { name: "agentrun", title: "AgentRun", version: "0.1.0" }, capabilities: { experimentalApi: true } });
const threadResponse = asResponseRecord(await client.request("thread/start", { model: options.model ?? "default", cwd: options.cwd, approvalPolicy: options.approvalPolicy, sandbox: options.sandbox, serviceName: "agentrun" }));
threadId = stringAt(asRecordAt(threadResponse, "thread"), "id") ?? threadId;
const turnResponse = asResponseRecord(await client.request("turn/start", { threadId: threadId ?? "", input: [{ type: "text", text: options.prompt, text_elements: [] }], cwd: options.cwd, approvalPolicy: options.approvalPolicy, model: options.model ?? "default" }));
turnId = stringAt(asRecordAt(turnResponse, "turn"), "id") ?? turnId;
await Promise.race([terminalPromise, client.closedPromise]);
if (!terminal) terminal = { status: "failed", failureKind: "backend-protocol-error", message: "codex app-server closed before turn/completed" };
} catch (error) {
terminal = { status: "failed", failureKind: "backend-protocol-error", message: error instanceof Error ? error.message : String(error) };
events.push({ type: "error", payload: { failureKind: terminal.failureKind, message: terminal.message } });
} finally {
clearTimeout(timeout);
client.stop();
}
if (assistantText.trim().length > 0) events.push({ type: "assistant_message", payload: { text: assistantText } });
events.push({ type: "terminal_status", payload: { terminalStatus: terminal.status, failureKind: terminal.failureKind, message: terminal.message } });
return { terminalStatus: terminal.status, failureKind: terminal.failureKind, failureMessage: terminal.message, events: events.map((event) => ({ ...event, payload: redactJson(event.payload) })), ...(threadId ? { threadId } : {}), ...(turnId ? { turnId } : {}) };
}
function codexHomeReadiness(codexHome: string): BackendTurnResult | null {
try {
accessSync(`${codexHome}/auth.json`, fsConstants.R_OK);
accessSync(`${codexHome}/config.toml`, fsConstants.R_OK);
return null;
} catch {
return {
terminalStatus: "blocked",
failureKind: "secret-unavailable",
failureMessage: "Codex auth.json or config.toml projection is not readable",
events: [
{ type: "error", payload: { failureKind: "secret-unavailable", credentialSource: { codexHome, valuesPrinted: false } } },
{ type: "terminal_status", payload: { terminalStatus: "blocked", failureKind: "secret-unavailable" } },
],
};
}
}
function normalizeCodexNotification(message: JsonRecord): { events: BackendEvent[]; assistantDelta?: string; threadId?: string; turnId?: string; terminal?: { status: TerminalStatus; failureKind: FailureKind | null; message: string | null } } {
const method = typeof message.method === "string" ? message.method : "unknown";
const params = asRecordAt(message, "params");
if (method === "thread/started") {
const threadId = stringAt(asRecordAt(params, "thread"), "id");
return { events: [{ type: "backend_status", payload: { phase: method, threadId } }], ...(threadId ? { threadId } : {}) };
}
if (method === "turn/started") {
const turnId = stringAt(asRecordAt(params, "turn"), "id");
return { events: [{ type: "backend_status", payload: { phase: method, turnId } }], ...(turnId ? { turnId } : {}) };
}
if (method === "item/agentMessage/delta") return { events: [], assistantDelta: typeof params.delta === "string" ? params.delta : "" };
if (method === "item/commandExecution/outputDelta") return { events: [{ type: "command_output", payload: { stream: "stdout", text: typeof params.delta === "string" ? params.delta : "" } }] };
if (method === "item/started" || method === "item/completed") return { events: [{ type: "tool_call", payload: { method, item: redactJson(asRecordAt(params, "item")) } }] };
if (method === "error") return { events: [{ type: "error", payload: { failureKind: "backend-failed", error: redactJson(params.error ?? params) } }] };
if (method === "turn/completed") {
const turn = asRecordAt(params, "turn");
const status = terminalStatusFromValue(turn.status);
const error = asRecordAt(turn, "error");
const messageText = typeof error.message === "string" ? error.message : null;
return { events: [{ type: "backend_status", payload: { phase: method, terminalStatus: status } }], terminal: { status, failureKind: status === "completed" ? null : "backend-failed", message: messageText } };
}
return { events: [{ type: "backend_status", payload: { phase: method } }] };
}
function terminalStatusFromValue(value: unknown): TerminalStatus {
if (value === "completed") return "completed";
if (value === "cancelled" || value === "canceled") return "cancelled";
if (value === "blocked") return "blocked";
return "failed";
}
function childEnv(options: CodexStdioTurnOptions): NodeJS.ProcessEnv {
const env: NodeJS.ProcessEnv = { ...process.env, ...options.env };
const codexHome = options.codexHome ?? options.env?.CODEX_HOME;
if (codexHome) env.CODEX_HOME = codexHome;
return env;
}
function asResponseRecord(value: unknown): JsonRecord {
return typeof value === "object" && value !== null && !Array.isArray(value) ? value as JsonRecord : {};
}
function asRecordAt(value: JsonRecord, key: string): JsonRecord {
const next = value[key];
return typeof next === "object" && next !== null && !Array.isArray(next) ? next as JsonRecord : {};
}
function stringAt(value: JsonRecord, key: string): string | null {
return typeof value[key] === "string" ? value[key] : null;
}
+28
View File
@@ -0,0 +1,28 @@
import type { FailureKind, JsonRecord } from "./types.js";
export class AgentRunError extends Error {
readonly failureKind: FailureKind;
readonly details: JsonRecord | null;
readonly httpStatus: number;
constructor(failureKind: FailureKind, message: string, options: { httpStatus?: number; details?: JsonRecord } = {}) {
super(message);
this.name = "AgentRunError";
this.failureKind = failureKind;
this.httpStatus = options.httpStatus ?? 400;
this.details = options.details ?? null;
}
}
export function errorToJson(error: unknown): JsonRecord {
if (error instanceof AgentRunError) {
return {
name: error.name,
failureKind: error.failureKind,
message: error.message,
details: error.details,
};
}
if (error instanceof Error) return { name: error.name, message: error.message, stack: error.stack ?? null };
return { name: "UnknownError", message: String(error) };
}
+22
View File
@@ -0,0 +1,22 @@
export function redactText(value: string): string {
return value
.replace(/(authorization\s*[:=]\s*)(bearer\s+)?[A-Za-z0-9._~+/=-]+/giu, "$1$2REDACTED")
.replace(/((?:api[_-]?key|token|password|secret)\s*[:=]\s*)[A-Za-z0-9._~+/=-]+/giu, "$1REDACTED")
.replace(/(postgres(?:ql)?:\/\/[^:\s/@]+:)[^@\s]+(@)/giu, "$1REDACTED$2")
.replace(/(https?:\/\/[^:\s/@]+:)[^@\s]+(@)/giu, "$1REDACTED$2");
}
export function redactJson<T>(value: T): T {
if (typeof value === "string") return redactText(value) as T;
if (Array.isArray(value)) return value.map((item) => redactJson(item)) as T;
if (typeof value !== "object" || value === null) return value;
const result: Record<string, unknown> = {};
for (const [key, entry] of Object.entries(value)) {
if (/auth|authorization|api[_-]?key|token|password|secret|credential|dsn/iu.test(key)) {
result[key] = typeof entry === "boolean" ? entry : "REDACTED";
continue;
}
result[key] = redactJson(entry);
}
return result as T;
}
+137
View File
@@ -0,0 +1,137 @@
export type JsonPrimitive = string | number | boolean | null;
export type JsonValue = JsonPrimitive | JsonValue[] | { [key: string]: JsonValue };
export type JsonRecord = { [key: string]: JsonValue };
export type FailureKind =
| "schema-invalid"
| "tenant-policy-denied"
| "secret-unavailable"
| "runner-lease-conflict"
| "backend-failed"
| "backend-protocol-error"
| "backend-timeout"
| "provider-auth-failed"
| "provider-rate-limited"
| "infra-failed"
| "cancelled";
export type RunStatus = "pending" | "claimed" | "running" | "completed" | "failed" | "blocked" | "cancelled";
export type CommandState = "pending" | "acknowledged" | "completed" | "failed" | "cancelled";
export type TerminalStatus = "completed" | "failed" | "blocked" | "cancelled";
export type BackendProfile = "codex";
export interface WorkspaceRef extends JsonRecord {
kind: "git-worktree" | "host-path" | "kubernetes-pvc" | "opaque";
path?: string;
repo?: string;
branch?: string;
}
export interface SecretRef extends JsonRecord {
namespace?: string;
name: string;
keys?: string[];
mountPath?: string;
}
export interface ExecutionPolicy extends JsonRecord {
sandbox: string;
approval: string;
timeoutMs: number;
network: string;
secretScope: {
providerCredentials?: Array<{
profile: BackendProfile | string;
secretRef: SecretRef;
}>;
allowCredentialEcho?: false;
};
}
export interface CreateRunInput extends JsonRecord {
tenantId: string;
projectId: string;
workspaceRef: WorkspaceRef;
providerId: string;
backendProfile: BackendProfile;
executionPolicy: ExecutionPolicy;
traceSink: JsonValue;
}
export interface RunRecord extends CreateRunInput {
id: string;
status: RunStatus;
terminalStatus: TerminalStatus | null;
failureKind: FailureKind | null;
failureMessage: string | null;
createdAt: string;
updatedAt: string;
claimedBy: string | null;
leaseExpiresAt: string | null;
}
export interface CreateCommandInput extends JsonRecord {
type: "turn" | "interrupt";
payload: JsonRecord;
idempotencyKey?: string;
}
export interface CommandRecord extends CreateCommandInput {
id: string;
runId: string;
seq: number;
state: CommandState;
payloadHash: string;
createdAt: string;
updatedAt: string;
acknowledgedAt: string | null;
}
export type EventType = "backend_status" | "assistant_message" | "tool_call" | "command_output" | "diff" | "error" | "terminal_status";
export interface RunEvent extends JsonRecord {
id: string;
runId: string;
seq: number;
type: EventType;
payload: JsonRecord;
createdAt: string;
}
export interface RunnerRecord extends JsonRecord {
id: string;
runId?: string;
attemptId?: string;
backendProfile?: BackendProfile;
placement?: string;
sourceCommit?: string;
registeredAt: string;
heartbeatAt: string;
}
export interface BackendEvent {
type: EventType;
payload: JsonRecord;
}
export interface BackendTurnResult {
terminalStatus: TerminalStatus;
failureKind: FailureKind | null;
failureMessage: string | null;
events: BackendEvent[];
threadId?: string;
turnId?: string;
}
export interface ApiErrorBody extends JsonRecord {
ok: false;
failureKind: FailureKind;
message: string;
traceId: string;
}
export interface ApiOkBody<T extends JsonValue = JsonValue> extends JsonRecord {
ok: true;
data: T;
traceId: string;
}
+88
View File
@@ -0,0 +1,88 @@
import { createHash, randomUUID } from "node:crypto";
import type { BackendProfile, CreateCommandInput, CreateRunInput, ExecutionPolicy, JsonRecord, JsonValue } from "./types.js";
import { AgentRunError } from "./errors.js";
const allowedTenants = new Set(["unidesk", "hwlab"]);
const allowedBackends = new Set<BackendProfile>(["codex"]);
export function nowIso(): string {
return new Date().toISOString();
}
export function newId(prefix: string): string {
return `${prefix}_${randomUUID().replace(/-/gu, "")}`;
}
export function stableHash(value: JsonValue): string {
return createHash("sha256").update(JSON.stringify(sortJson(value))).digest("hex");
}
function sortJson(value: JsonValue): JsonValue {
if (Array.isArray(value)) return value.map(sortJson);
if (typeof value !== "object" || value === null) return value;
return Object.fromEntries(Object.entries(value).sort(([a], [b]) => a.localeCompare(b)).map(([key, entry]) => [key, sortJson(entry)]));
}
export function asRecord(value: unknown, fieldName: string): JsonRecord {
if (typeof value === "object" && value !== null && !Array.isArray(value)) return value as JsonRecord;
throw new AgentRunError("schema-invalid", `${fieldName} must be an object`, { httpStatus: 400 });
}
function requiredString(record: JsonRecord, key: string): string {
const value = record[key];
if (typeof value !== "string" || value.trim().length === 0) throw new AgentRunError("schema-invalid", `${key} is required`, { httpStatus: 400 });
return value.trim();
}
function requiredRecord(record: JsonRecord, key: string): JsonRecord {
return asRecord(record[key], key);
}
export function validateCreateRun(input: unknown): CreateRunInput {
const record = asRecord(input, "run");
const tenantId = requiredString(record, "tenantId");
if (!allowedTenants.has(tenantId)) throw new AgentRunError("tenant-policy-denied", `tenantId ${tenantId} is not allowed`, { httpStatus: 403 });
const backendProfile = requiredString(record, "backendProfile") as BackendProfile;
if (!allowedBackends.has(backendProfile)) throw new AgentRunError("schema-invalid", `backendProfile ${backendProfile} is not supported in v0.1`, { httpStatus: 400 });
const executionPolicy = validateExecutionPolicy(requiredRecord(record, "executionPolicy"));
return {
tenantId,
projectId: requiredString(record, "projectId"),
workspaceRef: requiredRecord(record, "workspaceRef") as CreateRunInput["workspaceRef"],
providerId: requiredString(record, "providerId"),
backendProfile,
executionPolicy,
traceSink: record.traceSink ?? null,
};
}
export function validateExecutionPolicy(record: JsonRecord): ExecutionPolicy {
const timeout = record.timeoutMs;
if (typeof timeout !== "number" || !Number.isFinite(timeout) || timeout <= 0) throw new AgentRunError("schema-invalid", "executionPolicy.timeoutMs must be a positive number", { httpStatus: 400 });
const secretScope = asRecord(record.secretScope ?? {}, "executionPolicy.secretScope");
if (secretScope.allowCredentialEcho !== undefined && secretScope.allowCredentialEcho !== false) throw new AgentRunError("tenant-policy-denied", "allowCredentialEcho must be false", { httpStatus: 403 });
const providerCredentials = Array.isArray(secretScope.providerCredentials) ? secretScope.providerCredentials : [];
for (const credential of providerCredentials) {
const item = asRecord(credential, "providerCredential");
const secretRef = asRecord(item.secretRef, "providerCredential.secretRef");
if (typeof secretRef.name !== "string" || secretRef.name.length === 0) throw new AgentRunError("schema-invalid", "provider credential secretRef.name is required", { httpStatus: 400 });
}
const secretScopeResult: ExecutionPolicy["secretScope"] = { allowCredentialEcho: false };
if (providerCredentials.length > 0) secretScopeResult.providerCredentials = providerCredentials as NonNullable<ExecutionPolicy["secretScope"]["providerCredentials"]>;
return {
sandbox: requiredString(record, "sandbox"),
approval: requiredString(record, "approval"),
timeoutMs: timeout,
network: requiredString(record, "network"),
secretScope: secretScopeResult,
};
}
export function validateCreateCommand(input: unknown): CreateCommandInput {
const record = asRecord(input, "command");
const type = requiredString(record, "type");
if (type !== "turn" && type !== "interrupt") throw new AgentRunError("schema-invalid", `command type ${type} is not supported`, { httpStatus: 400 });
const payload = asRecord(record.payload ?? {}, "payload");
const idempotencyKey = typeof record.idempotencyKey === "string" && record.idempotencyKey.trim().length > 0 ? record.idempotencyKey.trim() : undefined;
return { type, payload, ...(idempotencyKey ? { idempotencyKey } : {}) };
}
+32
View File
@@ -0,0 +1,32 @@
import type { JsonRecord, JsonValue } from "../common/types.js";
import { AgentRunError } from "../common/errors.js";
export class ManagerClient {
constructor(readonly baseUrl: string) {}
async get(path: string): Promise<JsonValue> {
return this.request("GET", path);
}
async post(path: string, body: JsonValue): Promise<JsonValue> {
return this.request("POST", path, body);
}
async patch(path: string, body: JsonValue): Promise<JsonValue> {
return this.request("PATCH", path, body);
}
private async request(method: string, path: string, body?: JsonValue): Promise<JsonValue> {
const init: RequestInit = { method };
if (body !== undefined) {
init.headers = { "content-type": "application/json" };
init.body = JSON.stringify(body);
}
const response = await fetch(new URL(path, this.baseUrl), init);
const text = await response.text();
if (text.trim().length === 0) throw new AgentRunError("infra-failed", `manager returned empty response for ${method} ${path}`, { httpStatus: 502 });
const envelope = JSON.parse(text) as JsonRecord;
if (envelope.ok !== true) throw new AgentRunError(typeof envelope.failureKind === "string" ? envelope.failureKind as never : "infra-failed", typeof envelope.message === "string" ? envelope.message : `manager request failed: ${method} ${path}`, { httpStatus: response.status, details: envelope });
return envelope.data ?? null;
}
}
+6
View File
@@ -0,0 +1,6 @@
import { startManagerServer } from "./server.js";
const port = Number(process.env.PORT ?? process.env.AGENTRUN_MGR_PORT ?? "8080");
const host = process.env.HOST ?? "0.0.0.0";
const started = await startManagerServer({ port, host });
console.log(JSON.stringify({ ok: true, serviceId: "agentrun-mgr", baseUrl: started.baseUrl }));
+116
View File
@@ -0,0 +1,116 @@
import type { Server } from "node:http";
import { createServer } from "node:http";
import type { AddressInfo } from "node:net";
import type { AgentRunStore } from "./store.js";
import { MemoryAgentRunStore } from "./store.js";
import { AgentRunError, errorToJson } from "../common/errors.js";
import { asRecord, validateCreateCommand, validateCreateRun } from "../common/validation.js";
import type { ApiErrorBody, ApiOkBody, JsonRecord, JsonValue, RunEvent } from "../common/types.js";
export interface ManagerServerOptions {
store?: AgentRunStore;
port?: number;
host?: string;
sourceCommit?: string;
}
export interface StartedManagerServer {
server: Server;
baseUrl: string;
store: AgentRunStore;
}
export async function startManagerServer(options: ManagerServerOptions = {}): Promise<StartedManagerServer> {
const store = options.store ?? new MemoryAgentRunStore();
const sourceCommit = options.sourceCommit ?? process.env.AGENTRUN_SOURCE_COMMIT ?? "unknown";
const server = createServer(async (req, res) => {
const traceId = `trc_${Date.now().toString(36)}_${Math.random().toString(36).slice(2, 8)}`;
try {
const method = req.method ?? "GET";
const url = new URL(req.url ?? "/", "http://agentrun.local");
const data = await route({ method, url, body: await readBody(req), store, sourceCommit });
writeJson(res, 200, { ok: true, data, traceId });
} catch (error) {
const agentError = normalizeError(error);
writeJson(res, agentError.httpStatus, { ok: false, failureKind: agentError.failureKind, message: agentError.message, traceId, error: errorToJson(error) });
}
});
await new Promise<void>((resolve) => server.listen(options.port ?? 0, options.host ?? "127.0.0.1", resolve));
const address = server.address() as AddressInfo;
return { server, baseUrl: `http://${address.address}:${address.port}`, store };
}
async function readBody(req: import("node:http").IncomingMessage): Promise<unknown> {
if (req.method === "GET" || req.method === "HEAD") return null;
const chunks: Buffer[] = [];
for await (const chunk of req) chunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk));
const text = Buffer.concat(chunks).toString("utf8").trim();
if (text.length === 0) return null;
return JSON.parse(text) as unknown;
}
async function route({ method, url, body, store, sourceCommit }: { method: string; url: URL; body: unknown; store: AgentRunStore; sourceCommit: string }): Promise<JsonValue> {
const path = url.pathname;
if (method === "GET" && (path === "/health" || path === "/health/live" || path === "/health/readiness")) {
return { serviceId: "agentrun-mgr", live: true, ready: true, database: { adapter: "memory-self-test", migrationReady: true }, sourceCommit, secretRefs: { valuesPrinted: false } };
}
if (method === "GET" && path === "/api/v1/backends") return { items: store.backends() };
if (method === "POST" && path === "/api/v1/runs") return store.createRun(validateCreateRun(body)) as unknown as JsonValue;
const runMatch = path.match(/^\/api\/v1\/runs\/([^/]+)$/u);
if (method === "GET" && runMatch) return store.getRun(runMatch[1] ?? "") as unknown as JsonValue;
const eventMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/events$/u);
if (method === "GET" && eventMatch) {
const afterSeq = integerQuery(url, "afterSeq", 0);
const limit = integerQuery(url, "limit", 100);
return { items: store.listEvents(eventMatch[1] ?? "", afterSeq, limit) as unknown as JsonValue };
}
const commandCreateMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/commands$/u);
if (method === "POST" && commandCreateMatch) return store.createCommand(commandCreateMatch[1] ?? "", validateCreateCommand(body)) as unknown as JsonValue;
if (method === "GET" && commandCreateMatch) return { items: store.listCommands(commandCreateMatch[1] ?? "", integerQuery(url, "afterSeq", 0), integerQuery(url, "limit", 20)) as unknown as JsonValue };
const commandShowMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/commands\/([^/]+)$/u);
if (method === "GET" && commandShowMatch) return store.getCommand(commandShowMatch[2] ?? "") as unknown as JsonValue;
if (method === "POST" && path === "/api/v1/runners/register") return store.registerRunner(asRecord(body ?? {}, "runner")) as unknown as JsonValue;
const claimMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/claim$/u);
if (method === "POST" && claimMatch) {
const record = asRecord(body, "claim");
const runnerId = typeof record.runnerId === "string" ? record.runnerId : "";
if (runnerId.length === 0) throw new AgentRunError("schema-invalid", "runnerId is required", { httpStatus: 400 });
return store.claimRun(claimMatch[1] ?? "", runnerId, numberField(record, "leaseMs", 60_000)) as unknown as JsonValue;
}
const eventsAppendMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/events$/u);
if (method === "POST" && eventsAppendMatch) {
const record = asRecord(body, "event");
const type = typeof record.type === "string" ? record.type as RunEvent["type"] : "backend_status";
return store.appendEvent(eventsAppendMatch[1] ?? "", type, asRecord(record.payload ?? {}, "event.payload")) as unknown as JsonValue;
}
const statusMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/status$/u);
if (method === "PATCH" && statusMatch) {
const record = asRecord(body, "status");
const terminalStatus = record.terminalStatus === "completed" || record.terminalStatus === "failed" || record.terminalStatus === "blocked" || record.terminalStatus === "cancelled" ? record.terminalStatus : "failed";
return store.finishRun(statusMatch[1] ?? "", { terminalStatus, failureKind: typeof record.failureKind === "string" ? record.failureKind as never : null, failureMessage: typeof record.failureMessage === "string" ? record.failureMessage : null }) as unknown as JsonValue;
}
const ackMatch = path.match(/^\/api\/v1\/commands\/([^/]+)\/ack$/u);
if (method === "POST" && ackMatch) return store.ackCommand(ackMatch[1] ?? "") as unknown as JsonValue;
throw new AgentRunError("schema-invalid", `unsupported route ${method} ${path}`, { httpStatus: 404 });
}
function integerQuery(url: URL, key: string, fallback: number): number {
const value = Number(url.searchParams.get(key));
return Number.isInteger(value) && value >= 0 ? value : fallback;
}
function numberField(record: JsonRecord, key: string, fallback: number): number {
const value = record[key];
return typeof value === "number" && Number.isFinite(value) ? value : fallback;
}
function normalizeError(error: unknown): AgentRunError {
if (error instanceof AgentRunError) return error;
return new AgentRunError("infra-failed", error instanceof Error ? error.message : String(error), { httpStatus: 500 });
}
function writeJson(res: import("node:http").ServerResponse, statusCode: number, body: ApiOkBody | ApiErrorBody): void {
const text = `${JSON.stringify(body)}\n`;
res.writeHead(statusCode, { "content-type": "application/json; charset=utf-8", "content-length": Buffer.byteLength(text) });
res.end(text);
}
+138
View File
@@ -0,0 +1,138 @@
import type { BackendProfile, BackendTurnResult, CommandRecord, CreateCommandInput, CreateRunInput, FailureKind, JsonRecord, RunEvent, RunnerRecord, RunRecord, TerminalStatus } from "../common/types.js";
import { AgentRunError } from "../common/errors.js";
import { newId, nowIso, stableHash } from "../common/validation.js";
import { redactJson } from "../common/redaction.js";
export interface AgentRunStore {
createRun(input: CreateRunInput): RunRecord;
getRun(runId: string): RunRecord;
listEvents(runId: string, afterSeq: number, limit: number): RunEvent[];
createCommand(runId: string, input: CreateCommandInput): CommandRecord;
getCommand(commandId: string): CommandRecord;
listCommands(runId: string, afterSeq: number, limit: number): CommandRecord[];
registerRunner(input: Partial<RunnerRecord>): RunnerRecord;
claimRun(runId: string, runnerId: string, leaseMs: number): RunRecord;
heartbeat(runId: string, runnerId: string, leaseMs: number): RunRecord;
ackCommand(commandId: string): CommandRecord;
appendEvent(runId: string, type: RunEvent["type"], payload: JsonRecord): RunEvent;
finishRun(runId: string, result: Pick<BackendTurnResult, "terminalStatus" | "failureKind" | "failureMessage">): RunRecord;
backends(): JsonRecord[];
}
export class MemoryAgentRunStore implements AgentRunStore {
private readonly runs = new Map<string, RunRecord>();
private readonly commands = new Map<string, CommandRecord>();
private readonly eventsByRun = new Map<string, RunEvent[]>();
private readonly runners = new Map<string, RunnerRecord>();
createRun(input: CreateRunInput): RunRecord {
const at = nowIso();
const run: RunRecord = { ...input, id: newId("run"), status: "pending", terminalStatus: null, failureKind: null, failureMessage: null, createdAt: at, updatedAt: at, claimedBy: null, leaseExpiresAt: null };
this.runs.set(run.id, run);
this.eventsByRun.set(run.id, []);
this.appendEvent(run.id, "backend_status", { phase: "run-created", backendProfile: run.backendProfile });
return run;
}
getRun(runId: string): RunRecord {
const run = this.runs.get(runId);
if (!run) throw new AgentRunError("schema-invalid", `run ${runId} was not found`, { httpStatus: 404 });
return run;
}
listEvents(runId: string, afterSeq: number, limit: number): RunEvent[] {
this.getRun(runId);
return (this.eventsByRun.get(runId) ?? []).filter((event) => event.seq > afterSeq).slice(0, Math.max(1, Math.min(limit, 500)));
}
createCommand(runId: string, input: CreateCommandInput): CommandRecord {
this.getRun(runId);
const payloadHash = stableHash(input.payload);
if (input.idempotencyKey) {
const existing = Array.from(this.commands.values()).find((command) => command.runId === runId && command.idempotencyKey === input.idempotencyKey);
if (existing) {
if (existing.payloadHash !== payloadHash) throw new AgentRunError("schema-invalid", "idempotency key reused with different payload", { httpStatus: 409 });
return existing;
}
}
const at = nowIso();
const seq = Array.from(this.commands.values()).filter((command) => command.runId === runId).length + 1;
const command: CommandRecord = { ...input, id: newId("cmd"), runId, seq, state: "pending", payloadHash, createdAt: at, updatedAt: at, acknowledgedAt: null };
this.commands.set(command.id, command);
this.appendEvent(runId, "backend_status", { phase: "command-created", commandId: command.id, commandType: command.type });
return command;
}
getCommand(commandId: string): CommandRecord {
const command = this.commands.get(commandId);
if (!command) throw new AgentRunError("schema-invalid", `command ${commandId} was not found`, { httpStatus: 404 });
return command;
}
listCommands(runId: string, afterSeq: number, limit: number): CommandRecord[] {
this.getRun(runId);
return Array.from(this.commands.values()).filter((command) => command.runId === runId && command.seq > afterSeq).slice(0, Math.max(1, Math.min(limit, 100)));
}
registerRunner(input: Partial<RunnerRecord>): RunnerRecord {
const at = nowIso();
const runner: RunnerRecord = { id: input.id ?? newId("runner"), registeredAt: at, heartbeatAt: at, ...input };
this.runners.set(runner.id, runner);
return runner;
}
claimRun(runId: string, runnerId: string, leaseMs: number): RunRecord {
const run = this.getRun(runId);
if (run.claimedBy && run.claimedBy !== runnerId && run.status !== "completed" && run.status !== "failed" && run.status !== "cancelled") throw new AgentRunError("runner-lease-conflict", `run ${runId} is already claimed`, { httpStatus: 409 });
const next = this.updateRun(runId, { status: "claimed", claimedBy: runnerId, leaseExpiresAt: new Date(Date.now() + leaseMs).toISOString() });
this.appendEvent(runId, "backend_status", { phase: "run-claimed", runnerId });
return next;
}
heartbeat(runId: string, runnerId: string, leaseMs: number): RunRecord {
const run = this.getRun(runId);
if (run.claimedBy !== runnerId) throw new AgentRunError("runner-lease-conflict", `run ${runId} is not claimed by ${runnerId}`, { httpStatus: 409 });
return this.updateRun(runId, { leaseExpiresAt: new Date(Date.now() + leaseMs).toISOString() });
}
ackCommand(commandId: string): CommandRecord {
const command = this.getCommand(commandId);
const next = { ...command, state: "acknowledged" as const, acknowledgedAt: nowIso(), updatedAt: nowIso() };
this.commands.set(commandId, next);
return next;
}
appendEvent(runId: string, type: RunEvent["type"], payload: JsonRecord): RunEvent {
this.getRun(runId);
const events = this.eventsByRun.get(runId) ?? [];
const event: RunEvent = { id: newId("evt"), runId, seq: events.length + 1, type, payload: redactJson(payload), createdAt: nowIso() };
events.push(event);
this.eventsByRun.set(runId, events);
return event;
}
finishRun(runId: string, result: Pick<BackendTurnResult, "terminalStatus" | "failureKind" | "failureMessage">): RunRecord {
const status = statusFromTerminal(result.terminalStatus);
const next = this.updateRun(runId, { status, terminalStatus: result.terminalStatus, failureKind: result.failureKind, failureMessage: result.failureMessage });
this.appendEvent(runId, "terminal_status", { terminalStatus: result.terminalStatus, failureKind: result.failureKind, message: result.failureMessage });
return next;
}
backends(): JsonRecord[] {
return [{ profile: "codex" satisfies BackendProfile, protocol: "codex-app-server-jsonrpc-stdio", transport: "stdio", command: "codex app-server --listen stdio://", status: "registered" }];
}
private updateRun(runId: string, patch: Partial<RunRecord>): RunRecord {
const run = this.getRun(runId);
const next = { ...run, ...patch, updatedAt: nowIso() };
this.runs.set(runId, next);
return next;
}
}
function statusFromTerminal(terminalStatus: TerminalStatus): RunRecord["status"] {
if (terminalStatus === "completed") return "completed";
if (terminalStatus === "cancelled") return "cancelled";
if (terminalStatus === "blocked") return "blocked";
return "failed";
}
+19
View File
@@ -0,0 +1,19 @@
import { runOnce, type RunnerOnceOptions } from "./run-once.js";
const managerUrl = process.env.AGENTRUN_MGR_URL;
const runId = process.env.AGENTRUN_RUN_ID;
if (!managerUrl || !runId) {
console.log(JSON.stringify({ ok: false, failureKind: "schema-invalid", message: "AGENTRUN_MGR_URL and AGENTRUN_RUN_ID are required" }));
process.exit(2);
}
const options: RunnerOnceOptions = {
managerUrl,
runId,
};
if (process.env.AGENTRUN_RUNNER_ID) options.runnerId = process.env.AGENTRUN_RUNNER_ID;
if (process.env.AGENTRUN_CODEX_COMMAND) options.codexCommand = process.env.AGENTRUN_CODEX_COMMAND;
if (process.env.AGENTRUN_CODEX_ARGS) options.codexArgs = JSON.parse(process.env.AGENTRUN_CODEX_ARGS) as string[];
if (process.env.CODEX_HOME) options.codexHome = process.env.CODEX_HOME;
const result = await runOnce(options);
console.log(JSON.stringify({ ok: true, data: result }));
+35
View File
@@ -0,0 +1,35 @@
import { ManagerClient } from "../mgr/client.js";
import { runBackendTurn, type BackendAdapterOptions } from "../backend/adapter.js";
import type { CommandRecord, JsonRecord, RunRecord, RunnerRecord } from "../common/types.js";
export interface RunnerOnceOptions extends BackendAdapterOptions {
managerUrl: string;
runId: string;
runnerId?: string;
attemptId?: string;
leaseMs?: number;
}
export async function runOnce(options: RunnerOnceOptions): Promise<JsonRecord> {
const client = new ManagerClient(options.managerUrl);
const runner = await client.post("/api/v1/runners/register", {
id: options.runnerId ?? undefined,
runId: options.runId,
attemptId: options.attemptId ?? `attempt_${Date.now().toString(36)}`,
backendProfile: "codex",
placement: "host-process",
sourceCommit: process.env.AGENTRUN_SOURCE_COMMIT ?? "unknown",
} as JsonRecord) as unknown as RunnerRecord;
const claimed = await client.post(`/api/v1/runs/${options.runId}/claim`, { runnerId: runner.id, leaseMs: options.leaseMs ?? 60_000 }) as unknown as RunRecord;
const commandsResponse = await client.get(`/api/v1/runs/${options.runId}/commands?afterSeq=0&limit=20`) as { items?: CommandRecord[] };
const command = commandsResponse.items?.find((item) => item.state === "pending" && item.type === "turn");
if (!command) {
await client.patch(`/api/v1/runs/${options.runId}/status`, { terminalStatus: "blocked", failureKind: "schema-invalid", failureMessage: "no pending turn command" });
return { runner, claimed, terminalStatus: "blocked", failureKind: "schema-invalid" };
}
await client.post(`/api/v1/commands/${command.id}/ack`, {});
const result = await runBackendTurn(claimed, command, options);
for (const event of result.events) await client.post(`/api/v1/runs/${options.runId}/events`, event as unknown as JsonRecord);
const finalRun = await client.patch(`/api/v1/runs/${options.runId}/status`, { terminalStatus: result.terminalStatus, failureKind: result.failureKind, failureMessage: result.failureMessage }) as unknown as RunRecord;
return { runner, commandId: command.id, terminalStatus: result.terminalStatus, failureKind: result.failureKind, run: finalRun } as JsonRecord;
}
+48
View File
@@ -0,0 +1,48 @@
import * as readline from "node:readline";
const rl = readline.createInterface({ input: process.stdin, crlfDelay: Infinity });
let threadCounter = 0;
let turnCounter = 0;
for await (const line of rl) {
const trimmed = String(line).trim();
if (trimmed.length === 0) continue;
const message = JSON.parse(trimmed) as { id?: number; method?: string; params?: Record<string, unknown> };
if (message.method === "initialize") {
respond(message.id, { serverInfo: { name: "fake-codex-app-server", version: "self-test" } });
continue;
}
if (message.method === "thread/start") {
threadCounter += 1;
const thread = { id: `thread_selftest_${threadCounter}` };
notify("thread/started", { thread });
respond(message.id, { thread });
continue;
}
if (message.method === "thread/resume") {
const thread = { id: String(message.params?.threadId ?? "thread_selftest_resumed") };
notify("thread/started", { thread });
respond(message.id, { thread });
continue;
}
if (message.method === "turn/start") {
turnCounter += 1;
const turn = { id: `turn_selftest_${turnCounter}`, status: "completed" };
notify("turn/started", { turn });
notify("item/agentMessage/delta", { itemId: "msg_selftest", delta: "fake codex stdio reply" });
notify("item/commandExecution/outputDelta", { itemId: "cmd_selftest", delta: "Authorization: Bearer test-token\n" });
notify("turn/completed", { turn });
respond(message.id, { turn });
continue;
}
respond(message.id, null, { code: -32601, message: `unsupported fake method ${message.method ?? "unknown"}` });
}
function respond(id: number | undefined, result: unknown, error?: unknown): void {
if (id === undefined) return;
process.stdout.write(`${JSON.stringify(error ? { id, error } : { id, result })}\n`);
}
function notify(method: string, params: unknown): void {
process.stdout.write(`${JSON.stringify({ method, params })}\n`);
}
+65
View File
@@ -0,0 +1,65 @@
import { mkdtemp, mkdir, writeFile, rm } from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { fileURLToPath } from "node:url";
import assert from "node:assert/strict";
import { startManagerServer } from "../mgr/server.js";
import { ManagerClient } from "../mgr/client.js";
import { runOnce } from "../runner/run-once.js";
import { redactText } from "../common/redaction.js";
const root = path.resolve(path.dirname(fileURLToPath(import.meta.url)), "../..");
const tmp = await mkdtemp(path.join(os.tmpdir(), "agentrun-selftest-"));
try {
const codexHome = path.join(tmp, "codex-home");
const workspace = path.join(tmp, "workspace");
await mkdir(codexHome, { recursive: true });
await mkdir(workspace, { recursive: true });
await writeFile(path.join(codexHome, "auth.json"), JSON.stringify({ token: "test-token-material" }));
await writeFile(path.join(codexHome, "config.toml"), "model = \"gpt-test\"\n");
await writeFile(path.join(workspace, "README.md"), "self-test workspace\n");
assert.equal(redactText("Authorization: Bearer abc123"), "Authorization: Bearer REDACTED");
const server = await startManagerServer({ port: 0, host: "127.0.0.1", sourceCommit: "self-test" });
try {
const client = new ManagerClient(server.baseUrl);
const health = await client.get("/health/readiness") as { database?: { adapter?: string } };
assert.equal(health.database?.adapter, "memory-self-test");
const run = await client.post("/api/v1/runs", {
tenantId: "unidesk",
projectId: "pikasTech/unidesk",
workspaceRef: { kind: "host-path", path: workspace },
providerId: "G14",
backendProfile: "codex",
executionPolicy: {
sandbox: "workspace-write",
approval: "never",
timeoutMs: 15_000,
network: "default",
secretScope: { allowCredentialEcho: false, providerCredentials: [{ profile: "codex", secretRef: { name: "agentrun-v01-provider-codex", keys: ["auth.json", "config.toml"], mountPath: codexHome } }] },
},
traceSink: null,
}) as { id: string };
const command = await client.post(`/api/v1/runs/${run.id}/commands`, { type: "turn", payload: { prompt: "hello" }, idempotencyKey: "selftest-turn" }) as { id: string };
const duplicate = await client.post(`/api/v1/runs/${run.id}/commands`, { type: "turn", payload: { prompt: "hello" }, idempotencyKey: "selftest-turn" }) as { id: string };
assert.equal(duplicate.id, command.id);
const fakePath = path.join(root, "src/selftest/fake-codex-app-server.ts");
const fakeCommand = process.env.AGENTRUN_SELFTEST_CODEX_COMMAND ?? process.execPath;
const fakeArgs = process.env.AGENTRUN_SELFTEST_CODEX_ARGS ? JSON.parse(process.env.AGENTRUN_SELFTEST_CODEX_ARGS) as string[] : [fakePath];
const result = await runOnce({ managerUrl: server.baseUrl, runId: run.id, codexCommand: fakeCommand, codexArgs: fakeArgs, codexHome, env: { CODEX_HOME: codexHome } });
assert.equal(result.terminalStatus, "completed");
const events = await client.get(`/api/v1/runs/${run.id}/events?afterSeq=0&limit=100`) as { items?: Array<{ type: string; payload: unknown }> };
assert.ok(events.items?.some((event) => event.type === "assistant_message"));
assert.equal(JSON.stringify(events).includes("test-token-material"), false);
assert.equal(JSON.stringify(events).includes("Bearer test-token"), false);
const finalRun = await client.get(`/api/v1/runs/${run.id}`) as { terminalStatus?: string };
assert.equal(finalRun.terminalStatus, "completed");
console.log(JSON.stringify({ ok: true, tests: ["manager-memory-lifecycle", "codex-stdio-fake-turn", "redaction"], runId: run.id }));
} finally {
await new Promise<void>((resolve) => server.server.close(() => resolve()));
}
} finally {
await rm(tmp, { recursive: true, force: true });
}
+17
View File
@@ -0,0 +1,17 @@
{
"compilerOptions": {
"target": "ES2022",
"module": "NodeNext",
"moduleResolution": "NodeNext",
"lib": ["ES2022", "DOM"],
"strict": true,
"noImplicitReturns": true,
"noFallthroughCasesInSwitch": true,
"exactOptionalPropertyTypes": true,
"skipLibCheck": true,
"types": ["node"],
"rootDir": ".",
"noEmit": true
},
"include": ["scripts/**/*.ts", "src/**/*.ts"]
}