Merge pull request #41 from pikasTech/fix/v01-runner-turn-reuse-658
feat: 支持同 run runner 多轮 command
This commit is contained in:
@@ -84,6 +84,11 @@ async function dispatch(args: ParsedArgs): Promise<JsonValue> {
|
||||
}
|
||||
if (codexCommand) options.codexCommand = codexCommand;
|
||||
if (codexHome) options.codexHome = codexHome;
|
||||
const idleTimeoutMs = optionalFlag(args, "idle-timeout-ms");
|
||||
const pollIntervalMs = optionalFlag(args, "poll-interval-ms");
|
||||
if (idleTimeoutMs) options.idleTimeoutMs = Number(idleTimeoutMs);
|
||||
if (pollIntervalMs) options.pollIntervalMs = Number(pollIntervalMs);
|
||||
if (args.flags.get("one-shot") === true) options.oneShot = true;
|
||||
return runOnce(options) as unknown as JsonValue;
|
||||
}
|
||||
if (group === "runner" && command === "job") return renderRunnerJob(args);
|
||||
|
||||
@@ -33,21 +33,24 @@ export function normalizeRunEventPayload(type: EventType, payload: JsonRecord):
|
||||
export function eventContractSummary(events: RunEvent[]): JsonRecord {
|
||||
const issues: JsonRecord[] = [];
|
||||
let terminalStatusCount = 0;
|
||||
let runTerminalStatusCount = 0;
|
||||
for (let index = 0; index < events.length; index += 1) {
|
||||
const event = events[index];
|
||||
if (!eventTypeSet.has(event.type)) issues.push({ code: "event-type-invalid", seq: event.seq, type: event.type });
|
||||
if (event.seq !== index + 1) issues.push({ code: "seq-not-contiguous", expectedSeq: index + 1, actualSeq: event.seq });
|
||||
if (event.type === "terminal_status") {
|
||||
terminalStatusCount += 1;
|
||||
if (typeof event.payload.commandId !== "string") runTerminalStatusCount += 1;
|
||||
if (!isTerminalStatus(event.payload.terminalStatus)) issues.push({ code: "terminal-status-invalid", seq: event.seq, terminalStatus: String(event.payload.terminalStatus ?? "") });
|
||||
}
|
||||
}
|
||||
if (terminalStatusCount > 1) issues.push({ code: "terminal-status-duplicated", terminalStatusCount });
|
||||
if (runTerminalStatusCount > 1) issues.push({ code: "run-terminal-status-duplicated", runTerminalStatusCount });
|
||||
return {
|
||||
ok: issues.length === 0,
|
||||
eventCount: events.length,
|
||||
lastSeq: events.at(-1)?.seq ?? 0,
|
||||
terminalStatusCount,
|
||||
runTerminalStatusCount,
|
||||
issues,
|
||||
};
|
||||
}
|
||||
|
||||
@@ -6,7 +6,7 @@ import { redactJson } from "../common/redaction.js";
|
||||
import type { BackendProfile, BackendTurnResult, CommandRecord, CommandState, CreateCommandInput, CreateQueueTaskInput, CreateRunInput, EventType, FailureKind, JsonRecord, JsonValue, QueueCommanderSnapshot, QueueReadCursorRecord, QueueStats, QueueTaskListResult, QueueTaskRecord, QueueTaskState, RunEvent, RunnerJobRecord, RunnerRecord, RunRecord, RunStatus, SessionRecord, SessionRef, TerminalStatus } from "../common/types.js";
|
||||
import { newId, nowIso, stableHash } from "../common/validation.js";
|
||||
import type { AgentRunStore, ListQueueTasksInput, SaveRunnerJobInput, StoreHealth } from "./store.js";
|
||||
import { assertSessionBoundary, buildQueueStats, clampQueueLimit, commandStateFromTerminal, isTerminalCommandState, isTerminalQueueTaskState, isTerminalRunStatus, parseQueueCursor, queueTaskSort, sessionRefFromRecord, statusFromTerminal, summarizeResourceBundleRef, summarizeSessionRef } from "./store.js";
|
||||
import { assertSessionBoundary, buildQueueStats, clampQueueLimit, commandStateFromTerminal, isLeaseExpired, isTerminalCommandState, isTerminalQueueTaskState, isTerminalRunStatus, parseQueueCursor, queueTaskSort, sessionRefFromRecord, statusFromTerminal, summarizeResourceBundleRef, summarizeSessionRef } from "./store.js";
|
||||
import { backendCapabilitiesSqlValues, mergeBackendCapability } from "../common/backend-profiles.js";
|
||||
import { normalizeRunEventPayload, requireEventType } from "../common/events.js";
|
||||
|
||||
@@ -332,7 +332,8 @@ CREATE TABLE IF NOT EXISTS agentrun_schema_migrations (
|
||||
async createCommand(runId: string, input: CreateCommandInput): Promise<CommandRecord> {
|
||||
const payloadHash = stableHash(input.payload);
|
||||
return this.withTransaction(async (client) => {
|
||||
await this.requireRunForUpdate(client, runId);
|
||||
const run = await this.requireRunForUpdate(client, runId);
|
||||
if (isTerminalRunStatus(run.status)) throw new AgentRunError(run.failureKind ?? (run.status === "cancelled" ? "cancelled" : "schema-invalid"), `run ${runId} is already terminal: ${run.status}`, { httpStatus: 409 });
|
||||
if (input.idempotencyKey) {
|
||||
const existing = await client.query("SELECT * FROM agentrun_commands WHERE run_id = $1 AND idempotency_key = $2", [runId, input.idempotencyKey]);
|
||||
if (existing.rows[0]) {
|
||||
@@ -436,7 +437,7 @@ CREATE TABLE IF NOT EXISTS agentrun_schema_migrations (
|
||||
return this.withTransaction(async (client) => {
|
||||
const run = await this.requireRunForUpdate(client, runId);
|
||||
if (isTerminalRunStatus(run.status)) throw new AgentRunError(run.failureKind ?? (run.status === "cancelled" ? "cancelled" : "schema-invalid"), `run ${runId} is already terminal: ${run.status}`, { httpStatus: 409 });
|
||||
if (run.claimedBy && run.claimedBy !== runnerId) throw new AgentRunError("runner-lease-conflict", `run ${runId} is already claimed`, { httpStatus: 409 });
|
||||
if (run.claimedBy && run.claimedBy !== runnerId && !isLeaseExpired(run.leaseExpiresAt)) throw new AgentRunError("runner-lease-conflict", `run ${runId} is already claimed`, { httpStatus: 409 });
|
||||
const leaseExpiresAt = new Date(Date.now() + leaseMs).toISOString();
|
||||
const updated = await client.query(
|
||||
`UPDATE agentrun_runs SET status = $2, claimed_by = $3, lease_expires_at = $4, updated_at = $5 WHERE id = $1 RETURNING *`,
|
||||
@@ -478,7 +479,7 @@ CREATE TABLE IF NOT EXISTS agentrun_schema_migrations (
|
||||
});
|
||||
}
|
||||
|
||||
async finishCommand(commandId: string, result: Pick<BackendTurnResult, "terminalStatus" | "failureKind" | "failureMessage">): Promise<CommandRecord> {
|
||||
async finishCommand(commandId: string, result: Pick<BackendTurnResult, "terminalStatus" | "failureKind" | "failureMessage" | "threadId" | "turnId">): Promise<CommandRecord> {
|
||||
return this.withTransaction(async (client) => {
|
||||
const existing = await client.query("SELECT * FROM agentrun_commands WHERE id = $1 FOR UPDATE", [commandId]);
|
||||
const row = existing.rows[0];
|
||||
@@ -487,7 +488,9 @@ CREATE TABLE IF NOT EXISTS agentrun_schema_migrations (
|
||||
if (isTerminalCommandState(command.state)) return command;
|
||||
const state = commandStateFromTerminal(result.terminalStatus);
|
||||
const updated = await client.query("UPDATE agentrun_commands SET state = $2, updated_at = $3 WHERE id = $1 RETURNING *", [commandId, state, nowIso()]);
|
||||
await this.appendEventWithLockedRun(client, command.runId, "backend_status", { phase: "command-terminal", commandId, state, terminalStatus: result.terminalStatus, failureKind: result.failureKind });
|
||||
const run = await this.requireRunForUpdate(client, command.runId);
|
||||
if (result.threadId && run.sessionRef?.sessionId) await this.upsertSessionThread(client, run, result.threadId, result.turnId ?? null);
|
||||
await this.appendEventWithLockedRun(client, command.runId, "backend_status", { phase: "command-terminal", commandId, state, terminalStatus: result.terminalStatus, failureKind: result.failureKind, message: result.failureMessage ?? null, threadId: result.threadId ?? null, turnId: result.turnId ?? null });
|
||||
return commandFromRow(updated.rows[0]);
|
||||
});
|
||||
}
|
||||
@@ -544,13 +547,7 @@ CREATE TABLE IF NOT EXISTS agentrun_schema_migrations (
|
||||
const command = commandFromRow(row);
|
||||
if (isTerminalCommandState(command.state)) return command;
|
||||
const updated = await client.query("UPDATE agentrun_commands SET state = 'cancelled', updated_at = $2 WHERE id = $1 RETURNING *", [commandId, nowIso()]);
|
||||
await this.appendEventWithLockedRun(client, command.runId, "backend_status", { phase: "command-terminal", commandId, state: "cancelled", terminalStatus: "cancelled", failureKind: "cancelled" });
|
||||
const run = await this.requireRunForUpdate(client, command.runId);
|
||||
if (!isTerminalRunStatus(run.status)) {
|
||||
await this.appendEventWithLockedRun(client, command.runId, "backend_status", { phase: "cancel-requested", reason });
|
||||
await client.query(`UPDATE agentrun_runs SET status = 'cancelled', terminal_status = 'cancelled', failure_kind = 'cancelled', failure_message = $2, updated_at = $3 WHERE id = $1`, [command.runId, reason, nowIso()]);
|
||||
await this.appendEventWithLockedRun(client, command.runId, "terminal_status", { terminalStatus: "cancelled", failureKind: "cancelled", message: reason });
|
||||
}
|
||||
await this.appendEventWithLockedRun(client, command.runId, "backend_status", { phase: "command-terminal", commandId, state: "cancelled", terminalStatus: "cancelled", failureKind: "cancelled", message: reason });
|
||||
return commandFromRow(updated.rows[0]);
|
||||
});
|
||||
}
|
||||
|
||||
+23
-8
@@ -6,14 +6,17 @@ export async function buildRunResult(store: AgentRunStore, runId: string, comman
|
||||
const run = await store.getRun(runId);
|
||||
const command = await selectCommand(store, runId, commandId);
|
||||
const events = await store.listEvents(runId, 0, 500);
|
||||
const scopedEvents = command ? eventsForCommand(events, command.id) : events;
|
||||
const jobs = await store.listRunnerJobs(runId, command?.id);
|
||||
const latestJob = jobs.at(-1) ?? null;
|
||||
const terminalEventStatus = terminalFromEvents(events);
|
||||
const terminal = terminalEventStatus ?? run.terminalStatus;
|
||||
const terminalSource = terminalEventStatus ? "terminal_status-event" : run.terminalStatus ? "run-record" : "none";
|
||||
const failureKind = run.failureKind ?? failureKindFromEvents(events);
|
||||
const reply = assistantReply(events);
|
||||
const blocker = terminal === "blocked" || terminal === "failed" ? { failureKind, message: run.failureMessage ?? messageFromEvents(events) } : null;
|
||||
const commandTerminal = command ? terminalFromCommand(command) : null;
|
||||
const terminalEventStatus = terminalFromEvents(scopedEvents);
|
||||
const terminal = commandTerminal ?? terminalEventStatus ?? run.terminalStatus;
|
||||
const terminalSource = commandTerminal ? "command-record" : terminalEventStatus ? "terminal_status-event" : run.terminalStatus ? "run-record" : "none";
|
||||
const failureKind = command ? failureKindFromEvents(scopedEvents) : run.failureKind ?? failureKindFromEvents(scopedEvents);
|
||||
const failureMessage = command ? messageFromEvents(scopedEvents) : run.failureMessage ?? messageFromEvents(scopedEvents);
|
||||
const reply = assistantReply(scopedEvents);
|
||||
const blocker = terminal === "blocked" || terminal === "failed" ? { failureKind, message: failureMessage } : null;
|
||||
return {
|
||||
runId: run.id,
|
||||
commandId: command?.id ?? commandId ?? null,
|
||||
@@ -29,11 +32,11 @@ export async function buildRunResult(store: AgentRunStore, runId: string, comman
|
||||
completed: terminal === "completed",
|
||||
reply,
|
||||
failureKind,
|
||||
failureMessage: run.failureMessage ?? messageFromEvents(events),
|
||||
failureMessage,
|
||||
blocker,
|
||||
lastSeq: events.at(-1)?.seq ?? 0,
|
||||
eventCount: events.length,
|
||||
artifactSummary: artifactSummary(events),
|
||||
artifactSummary: artifactSummary(scopedEvents),
|
||||
sessionRef: sessionSummary(run),
|
||||
resourceBundleRef: resourceBundleSummary(run, events),
|
||||
runnerJobCount: jobs.length,
|
||||
@@ -58,6 +61,18 @@ function terminalFromEvents(events: RunEvent[]): TerminalStatus | null {
|
||||
return null;
|
||||
}
|
||||
|
||||
function terminalFromCommand(command: CommandRecord): TerminalStatus | null {
|
||||
if (command.state === "completed") return "completed";
|
||||
if (command.state === "failed") return "failed";
|
||||
if (command.state === "cancelled") return "cancelled";
|
||||
return null;
|
||||
}
|
||||
|
||||
function eventsForCommand(events: RunEvent[], commandId: string): RunEvent[] {
|
||||
const scoped = events.filter((event) => event.payload.commandId === commandId || typeof event.payload.commandId !== "string");
|
||||
return scoped.length > 0 ? scoped : events;
|
||||
}
|
||||
|
||||
function failureKindFromEvents(events: RunEvent[]): string | null {
|
||||
for (const event of [...events].reverse()) {
|
||||
const value = event.payload.failureKind;
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
import type { JsonRecord, RunEvent, RunnerJobRecord, TerminalStatus } from "../common/types.js";
|
||||
|
||||
export function runnerJobStatusSummary(job: RunnerJobRecord, events: RunEvent[] = []): JsonRecord {
|
||||
const terminalEvent = latestTerminalEvent(events);
|
||||
const terminalEvent = latestTerminalEvent(events, job.commandId);
|
||||
const runner = recordAt(job.result, "runner");
|
||||
const jobIdentity = recordAt(job.result, "jobIdentity");
|
||||
const kubernetes = recordAt(job.result, "kubernetes");
|
||||
@@ -36,9 +36,11 @@ export function runnerJobStatusSummary(job: RunnerJobRecord, events: RunEvent[]
|
||||
};
|
||||
}
|
||||
|
||||
function latestTerminalEvent(events: RunEvent[]): RunEvent | null {
|
||||
function latestTerminalEvent(events: RunEvent[], commandId: string): RunEvent | null {
|
||||
for (const event of [...events].reverse()) {
|
||||
if (event.payload.commandId && event.payload.commandId !== commandId) continue;
|
||||
if (event.type === "terminal_status") return event;
|
||||
if (event.type === "backend_status" && event.payload.phase === "command-terminal" && event.payload.commandId === commandId) return event;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
+7
-1
@@ -191,7 +191,13 @@ async function route({ method, url, body, store, sourceCommit, runnerJobDefaults
|
||||
if (method === "PATCH" && commandStatusMatch) {
|
||||
const record = asRecord(body, "commandStatus");
|
||||
const terminalStatus = record.terminalStatus === "completed" || record.terminalStatus === "failed" || record.terminalStatus === "blocked" || record.terminalStatus === "cancelled" ? record.terminalStatus : "failed";
|
||||
return await store.finishCommand(commandStatusMatch[1] ?? "", { terminalStatus, failureKind: typeof record.failureKind === "string" ? record.failureKind as never : null, failureMessage: typeof record.failureMessage === "string" ? record.failureMessage : null }) as unknown as JsonValue;
|
||||
return await store.finishCommand(commandStatusMatch[1] ?? "", {
|
||||
terminalStatus,
|
||||
failureKind: typeof record.failureKind === "string" ? record.failureKind as never : null,
|
||||
failureMessage: typeof record.failureMessage === "string" ? record.failureMessage : null,
|
||||
...(typeof record.threadId === "string" ? { threadId: record.threadId } : {}),
|
||||
...(typeof record.turnId === "string" ? { turnId: record.turnId } : {}),
|
||||
}) as unknown as JsonValue;
|
||||
}
|
||||
const commandCancelMatch = path.match(/^\/api\/v1\/commands\/([^/]+)\/cancel$/u);
|
||||
if (method === "POST" && commandCancelMatch) {
|
||||
|
||||
+14
-7
@@ -33,7 +33,7 @@ export interface AgentRunStore {
|
||||
claimRun(runId: string, runnerId: string, leaseMs: number): MaybePromise<RunRecord>;
|
||||
heartbeat(runId: string, runnerId: string, leaseMs: number): MaybePromise<RunRecord>;
|
||||
ackCommand(commandId: string): MaybePromise<CommandRecord>;
|
||||
finishCommand(commandId: string, result: Pick<BackendTurnResult, "terminalStatus" | "failureKind" | "failureMessage">): MaybePromise<CommandRecord>;
|
||||
finishCommand(commandId: string, result: Pick<BackendTurnResult, "terminalStatus" | "failureKind" | "failureMessage" | "threadId" | "turnId">): MaybePromise<CommandRecord>;
|
||||
appendEvent(runId: string, type: RunEvent["type"], payload: JsonRecord): MaybePromise<RunEvent>;
|
||||
finishRun(runId: string, result: Pick<BackendTurnResult, "terminalStatus" | "failureKind" | "failureMessage" | "threadId" | "turnId">): MaybePromise<RunRecord>;
|
||||
cancelRun(runId: string, reason?: string): MaybePromise<RunRecord>;
|
||||
@@ -122,7 +122,8 @@ export class MemoryAgentRunStore implements AgentRunStore {
|
||||
}
|
||||
|
||||
createCommand(runId: string, input: CreateCommandInput): CommandRecord {
|
||||
this.getRun(runId);
|
||||
const run = this.getRun(runId);
|
||||
if (isTerminalRunStatus(run.status)) throw new AgentRunError(run.failureKind ?? (run.status === "cancelled" ? "cancelled" : "schema-invalid"), `run ${runId} is already terminal: ${run.status}`, { httpStatus: 409 });
|
||||
const payloadHash = stableHash(input.payload);
|
||||
if (input.idempotencyKey) {
|
||||
const existing = Array.from(this.commands.values()).find((command) => command.runId === runId && command.idempotencyKey === input.idempotencyKey);
|
||||
@@ -185,7 +186,7 @@ export class MemoryAgentRunStore implements AgentRunStore {
|
||||
claimRun(runId: string, runnerId: string, leaseMs: number): RunRecord {
|
||||
const run = this.getRun(runId);
|
||||
if (isTerminalRunStatus(run.status)) throw new AgentRunError(run.failureKind ?? (run.status === "cancelled" ? "cancelled" : "schema-invalid"), `run ${runId} is already terminal: ${run.status}`, { httpStatus: 409 });
|
||||
if (run.claimedBy && run.claimedBy !== runnerId) throw new AgentRunError("runner-lease-conflict", `run ${runId} is already claimed`, { httpStatus: 409 });
|
||||
if (run.claimedBy && run.claimedBy !== runnerId && !isLeaseExpired(run.leaseExpiresAt)) throw new AgentRunError("runner-lease-conflict", `run ${runId} is already claimed`, { httpStatus: 409 });
|
||||
const next = this.updateRun(runId, { status: "claimed", claimedBy: runnerId, leaseExpiresAt: new Date(Date.now() + leaseMs).toISOString() });
|
||||
this.appendEvent(runId, "backend_status", { phase: "run-claimed", runnerId });
|
||||
return next;
|
||||
@@ -206,12 +207,14 @@ export class MemoryAgentRunStore implements AgentRunStore {
|
||||
return next;
|
||||
}
|
||||
|
||||
finishCommand(commandId: string, result: Pick<BackendTurnResult, "terminalStatus" | "failureKind" | "failureMessage">): CommandRecord {
|
||||
finishCommand(commandId: string, result: Pick<BackendTurnResult, "terminalStatus" | "failureKind" | "failureMessage" | "threadId" | "turnId">): CommandRecord {
|
||||
const command = this.getCommand(commandId);
|
||||
if (isTerminalCommandState(command.state)) return command;
|
||||
const next = { ...command, state: commandStateFromTerminal(result.terminalStatus), updatedAt: nowIso() };
|
||||
this.commands.set(commandId, next);
|
||||
this.appendEvent(command.runId, "backend_status", { phase: "command-terminal", commandId, state: next.state, terminalStatus: result.terminalStatus, failureKind: result.failureKind });
|
||||
const run = this.getRun(command.runId);
|
||||
if (result.threadId && run.sessionRef?.sessionId) this.upsertSessionThread(run, result.threadId, result.turnId ?? null);
|
||||
this.appendEvent(command.runId, "backend_status", { phase: "command-terminal", commandId, state: next.state, terminalStatus: result.terminalStatus, failureKind: result.failureKind, message: result.failureMessage ?? null, threadId: result.threadId ?? null, turnId: result.turnId ?? null });
|
||||
return next;
|
||||
}
|
||||
|
||||
@@ -256,8 +259,7 @@ export class MemoryAgentRunStore implements AgentRunStore {
|
||||
if (isTerminalCommandState(command.state)) return command;
|
||||
const next = { ...command, state: "cancelled" as const, updatedAt: nowIso() };
|
||||
this.commands.set(commandId, next);
|
||||
this.appendEvent(command.runId, "backend_status", { phase: "command-terminal", commandId, state: "cancelled", terminalStatus: "cancelled", failureKind: "cancelled" });
|
||||
this.cancelRun(command.runId, reason);
|
||||
this.appendEvent(command.runId, "backend_status", { phase: "command-terminal", commandId, state: "cancelled", terminalStatus: "cancelled", failureKind: "cancelled", message: reason });
|
||||
return next;
|
||||
}
|
||||
|
||||
@@ -419,6 +421,11 @@ export function isTerminalQueueTaskState(state: QueueTaskState): boolean {
|
||||
return state === "completed" || state === "failed" || state === "blocked" || state === "cancelled";
|
||||
}
|
||||
|
||||
export function isLeaseExpired(leaseExpiresAt: string | null): boolean {
|
||||
if (!leaseExpiresAt) return true;
|
||||
return new Date(leaseExpiresAt).getTime() <= Date.now();
|
||||
}
|
||||
|
||||
export function sessionRefFromRecord(record: SessionRecord, fallback: SessionRef): SessionRef {
|
||||
return {
|
||||
sessionId: record.sessionId,
|
||||
|
||||
@@ -157,6 +157,7 @@ function runnerEnv(options: RunnerJobRenderOptions, context: { namespace: string
|
||||
{ name: "AGENTRUN_RUNTIME_NAMESPACE", value: context.namespace },
|
||||
{ name: "AGENTRUN_K8S_JOB_NAME", value: context.jobName },
|
||||
{ name: "AGENTRUN_LOG_PATH", value: "/tmp/agentrun-runner.jsonl" },
|
||||
{ name: "AGENTRUN_RUNNER_IDLE_TIMEOUT_MS", value: "600000" },
|
||||
{ name: "HOME", value: "/home/agentrun" },
|
||||
{ name: "CODEX_HOME", value: codexHome },
|
||||
...(selectedSecret ? [{ name: "AGENTRUN_CODEX_SECRET_HOME", value: selectedSecret.projectionMountPath }] : []),
|
||||
|
||||
@@ -32,6 +32,9 @@ if (process.env.AGENTRUN_LOG_PATH) options.logPath = process.env.AGENTRUN_LOG_PA
|
||||
if (process.env.AGENTRUN_CODEX_COMMAND) options.codexCommand = process.env.AGENTRUN_CODEX_COMMAND;
|
||||
if (process.env.AGENTRUN_CODEX_ARGS) options.codexArgs = JSON.parse(process.env.AGENTRUN_CODEX_ARGS) as string[];
|
||||
if (process.env.CODEX_HOME) options.codexHome = process.env.CODEX_HOME;
|
||||
if (process.env.AGENTRUN_RUNNER_IDLE_TIMEOUT_MS) options.idleTimeoutMs = Number(process.env.AGENTRUN_RUNNER_IDLE_TIMEOUT_MS);
|
||||
if (process.env.AGENTRUN_RUNNER_POLL_INTERVAL_MS) options.pollIntervalMs = Number(process.env.AGENTRUN_RUNNER_POLL_INTERVAL_MS);
|
||||
if (process.env.AGENTRUN_RUNNER_ONE_SHOT === "1" || process.env.AGENTRUN_RUNNER_ONE_SHOT === "true") options.oneShot = true;
|
||||
try {
|
||||
const result = await runOnce(options);
|
||||
console.log(JSON.stringify({ ok: true, data: result }));
|
||||
|
||||
@@ -84,7 +84,7 @@ export class RunnerManagerApi {
|
||||
return await this.client.patch(`/api/v1/runs/${encodeURIComponent(runId)}/status`, report as unknown as JsonRecord) as RunRecord;
|
||||
}
|
||||
|
||||
async reportCommandStatus(commandId: string, report: { terminalStatus: TerminalStatus; failureKind: FailureKind | null; failureMessage: string | null }): Promise<CommandRecord> {
|
||||
async reportCommandStatus(commandId: string, report: { terminalStatus: TerminalStatus; failureKind: FailureKind | null; failureMessage: string | null; threadId?: string; turnId?: string }): Promise<CommandRecord> {
|
||||
return await this.client.patch(`/api/v1/commands/${encodeURIComponent(commandId)}/status`, report as unknown as JsonRecord) as CommandRecord;
|
||||
}
|
||||
|
||||
|
||||
+129
-33
@@ -1,7 +1,7 @@
|
||||
import { RunnerManagerApi, failureKindFromError, terminalStatusForFailure, errorMessage } from "./manager-api.js";
|
||||
import { runBackendTurn, type BackendAdapterOptions } from "../backend/adapter.js";
|
||||
import { materializeResourceBundle } from "./resource-bundle.js";
|
||||
import type { BackendProfile, JsonRecord, RunRecord, RunnerRecord } from "../common/types.js";
|
||||
import type { BackendEvent, BackendProfile, CommandRecord, FailureKind, JsonRecord, RunRecord, RunnerRecord, TerminalStatus } from "../common/types.js";
|
||||
import { AgentRunError } from "../common/errors.js";
|
||||
|
||||
export interface RunnerOnceOptions extends BackendAdapterOptions {
|
||||
@@ -17,11 +17,20 @@ export interface RunnerOnceOptions extends BackendAdapterOptions {
|
||||
jobName?: string;
|
||||
podName?: string;
|
||||
logPath?: string;
|
||||
idleTimeoutMs?: number;
|
||||
pollIntervalMs?: number;
|
||||
oneShot?: boolean;
|
||||
}
|
||||
|
||||
interface CommandExecutionResult extends JsonRecord {
|
||||
commandId: string;
|
||||
terminalStatus: TerminalStatus;
|
||||
failureKind: FailureKind | null;
|
||||
}
|
||||
|
||||
export async function runOnce(options: RunnerOnceOptions): Promise<JsonRecord> {
|
||||
const api = new RunnerManagerApi(options.managerUrl);
|
||||
const targetRun = await api.client.get(`/api/v1/runs/${encodeURIComponent(options.runId)}`) as RunRecord;
|
||||
const targetRun = await api.getRun(options.runId);
|
||||
if (isTerminalRun(targetRun)) return { terminalStatus: targetRun.terminalStatus, failureKind: targetRun.failureKind, run: targetRun, skipped: "run-terminal" } as JsonRecord;
|
||||
if (options.backendProfile && options.backendProfile !== targetRun.backendProfile) {
|
||||
throw new AgentRunError("schema-invalid", `runner backendProfile ${options.backendProfile} does not match run backendProfile ${targetRun.backendProfile}`, { httpStatus: 400 });
|
||||
@@ -50,41 +59,87 @@ export async function runOnce(options: RunnerOnceOptions): Promise<JsonRecord> {
|
||||
}
|
||||
throw error;
|
||||
}
|
||||
const commandsResponse = await api.pollCommands(options.runId, { afterSeq: 0, limit: 20, ...(options.commandId ? { commandId: options.commandId } : {}) });
|
||||
const command = commandsResponse.selected;
|
||||
if (!command) {
|
||||
await api.reportStatus(options.runId, { terminalStatus: "blocked", failureKind: "schema-invalid", failureMessage: "no pending turn command" });
|
||||
return { runner, claimed, terminalStatus: "blocked", failureKind: "schema-invalid", polledCommands: commandsResponse.items.length };
|
||||
}
|
||||
await api.ackCommand(command.id);
|
||||
const acked = await api.getCommand(options.runId, command.id);
|
||||
if (acked.state === "cancelled") return await reportCancelled(api, options.runId, command.id, runner, claimed, "command cancelled before backend start");
|
||||
await assertNotCancelled(api, options.runId, command.id);
|
||||
const abortController = new AbortController();
|
||||
const stopCancelWatch = watchCancellation(api, options.runId, command.id, abortController);
|
||||
|
||||
const stopHeartbeat = startHeartbeat(api, options.runId, runner.id, leaseMs);
|
||||
const idleTimeoutMs = options.idleTimeoutMs ?? 120_000;
|
||||
const pollIntervalMs = options.pollIntervalMs ?? 2_000;
|
||||
const commandResults: CommandExecutionResult[] = [];
|
||||
let workspacePath: string | undefined;
|
||||
let materializationAttempted = false;
|
||||
let materializationFailure: { failureKind: FailureKind; terminalStatus: TerminalStatus; message: string } | null = null;
|
||||
|
||||
try {
|
||||
const materialized = await materializeResourceBundle(claimed.resourceBundleRef ?? null, options.env ?? process.env);
|
||||
if (materialized) {
|
||||
workspacePath = materialized.workspacePath;
|
||||
await api.appendEvent(options.runId, { type: "backend_status", payload: materialized.event });
|
||||
let idleSince = Date.now();
|
||||
let firstPoll = true;
|
||||
while (true) {
|
||||
const currentRun = await api.getRun(options.runId);
|
||||
if (isTerminalRun(currentRun)) return { runner, claimed, terminalStatus: currentRun.terminalStatus, failureKind: currentRun.failureKind, run: currentRun, commandsProcessed: commandResults.length, commandResults, stopped: "run-terminal" } as JsonRecord;
|
||||
|
||||
const commandsResponse = await api.pollCommands(options.runId, { afterSeq: 0, limit: 20, ...(firstPoll && options.commandId ? { commandId: options.commandId } : {}) });
|
||||
firstPoll = false;
|
||||
const command = commandsResponse.selected;
|
||||
if (!command) {
|
||||
if (options.oneShot === true) return noPendingResult(runner, claimed, commandResults, commandsResponse.items.length, "one-shot-no-pending");
|
||||
if (Date.now() - idleSince >= idleTimeoutMs) return noPendingResult(runner, claimed, commandResults, commandsResponse.items.length, "idle-timeout");
|
||||
await sleep(pollIntervalMs);
|
||||
continue;
|
||||
}
|
||||
|
||||
idleSince = Date.now();
|
||||
if (!materializationAttempted) {
|
||||
materializationAttempted = true;
|
||||
try {
|
||||
const materialized = await materializeResourceBundle(claimed.resourceBundleRef ?? null, options.env ?? process.env);
|
||||
if (materialized) {
|
||||
workspacePath = materialized.workspacePath;
|
||||
await api.appendEvent(options.runId, { type: "backend_status", payload: { ...materialized.event, commandId: command.id, attemptId, runnerId: runner.id } });
|
||||
}
|
||||
} catch (error) {
|
||||
const failureKind = failureKindFromError(error);
|
||||
materializationFailure = { failureKind, terminalStatus: terminalStatusForFailure(failureKind), message: errorMessage(error) };
|
||||
}
|
||||
}
|
||||
|
||||
const result = materializationFailure
|
||||
? await reportCommandFailure(api, options.runId, command.id, runner, attemptId, materializationFailure, "runner:resource-bundle")
|
||||
: await executeCommand(api, options, command, runner, attemptId, workspacePath);
|
||||
commandResults.push(result);
|
||||
if (options.oneShot === true) {
|
||||
const run = await api.getRun(options.runId);
|
||||
return { runner, commandId: command.id, terminalStatus: result.terminalStatus, failureKind: result.failureKind, run, commandsProcessed: commandResults.length, commandResults, stopped: "one-shot" } as JsonRecord;
|
||||
}
|
||||
}
|
||||
await assertNotCancelled(api, options.runId, command.id);
|
||||
const result = await runBackendTurn(claimed, command, { ...options, ...(workspacePath ? { workspacePath } : {}), abortSignal: abortController.signal });
|
||||
for (const event of result.events) {
|
||||
if (event.type !== "terminal_status") await api.appendEvent(options.runId, event);
|
||||
}
|
||||
await api.reportCommandStatus(command.id, { terminalStatus: result.terminalStatus, failureKind: result.failureKind, failureMessage: result.failureMessage });
|
||||
const finalRun = await api.reportStatus(options.runId, { terminalStatus: result.terminalStatus, failureKind: result.failureKind, failureMessage: result.failureMessage, ...(result.threadId ? { threadId: result.threadId } : {}), ...(result.turnId ? { turnId: result.turnId } : {}) }) as RunRecord;
|
||||
return { runner, commandId: command.id, terminalStatus: result.terminalStatus, failureKind: result.failureKind, run: finalRun } as JsonRecord;
|
||||
} catch (error) {
|
||||
const failureKind = failureKindFromError(error);
|
||||
const terminalStatus = terminalStatusForFailure(failureKind);
|
||||
const message = errorMessage(error);
|
||||
await api.appendEvent(options.runId, { type: "error", payload: { failureKind, message, phase: "runner:execute", attemptId, runnerId: runner.id } });
|
||||
await api.reportCommandStatus(command.id, { terminalStatus, failureKind, failureMessage: message });
|
||||
await api.appendEvent(options.runId, { type: "error", payload: { failureKind, message, phase: "runner:loop", attemptId, runnerId: runner.id } });
|
||||
const finalRun = await api.reportStatus(options.runId, { terminalStatus, failureKind, failureMessage: message }) as RunRecord;
|
||||
return { runner, commandId: command.id, terminalStatus, failureKind, run: finalRun } as JsonRecord;
|
||||
return { runner, terminalStatus, failureKind, run: finalRun, commandsProcessed: commandResults.length, commandResults } as JsonRecord;
|
||||
} finally {
|
||||
stopHeartbeat();
|
||||
}
|
||||
}
|
||||
|
||||
async function executeCommand(api: RunnerManagerApi, options: RunnerOnceOptions, command: CommandRecord, runner: RunnerRecord, attemptId: string, workspacePath: string | undefined): Promise<CommandExecutionResult> {
|
||||
await api.ackCommand(command.id);
|
||||
const acked = await api.getCommand(options.runId, command.id);
|
||||
if (acked.state === "cancelled") return await reportCancelled(api, options.runId, command.id, runner, attemptId, "command cancelled before backend start");
|
||||
await assertNotCancelled(api, options.runId, command.id);
|
||||
const abortController = new AbortController();
|
||||
const stopCancelWatch = watchCancellation(api, options.runId, command.id, abortController);
|
||||
try {
|
||||
const latestRun = await api.getRun(options.runId);
|
||||
const result = await runBackendTurn(latestRun, command, { ...options, ...(workspacePath ? { workspacePath } : {}), abortSignal: abortController.signal });
|
||||
for (const event of result.events) {
|
||||
await api.appendEvent(options.runId, annotateCommandEvent(event, command.id, attemptId, runner.id));
|
||||
}
|
||||
await api.reportCommandStatus(command.id, { terminalStatus: result.terminalStatus, failureKind: result.failureKind, failureMessage: result.failureMessage, ...(result.threadId ? { threadId: result.threadId } : {}), ...(result.turnId ? { turnId: result.turnId } : {}) });
|
||||
return { commandId: command.id, terminalStatus: result.terminalStatus, failureKind: result.failureKind } as CommandExecutionResult;
|
||||
} catch (error) {
|
||||
const failureKind = failureKindFromError(error);
|
||||
const failure = { failureKind, terminalStatus: terminalStatusForFailure(failureKind), message: errorMessage(error) };
|
||||
return await reportCommandFailure(api, options.runId, command.id, runner, attemptId, failure, "runner:execute");
|
||||
} finally {
|
||||
stopCancelWatch();
|
||||
}
|
||||
@@ -118,8 +173,49 @@ function watchCancellation(api: RunnerManagerApi, runId: string, commandId: stri
|
||||
};
|
||||
}
|
||||
|
||||
async function reportCancelled(api: RunnerManagerApi, runId: string, commandId: string, runner: RunnerRecord, claimed: RunRecord, message: string): Promise<JsonRecord> {
|
||||
await api.reportCommandStatus(commandId, { terminalStatus: "cancelled", failureKind: "cancelled", failureMessage: message });
|
||||
const finalRun = await api.reportStatus(runId, { terminalStatus: "cancelled", failureKind: "cancelled", failureMessage: message });
|
||||
return { runner, commandId, claimed, terminalStatus: "cancelled", failureKind: "cancelled", run: finalRun };
|
||||
function startHeartbeat(api: RunnerManagerApi, runId: string, runnerId: string, leaseMs: number): () => void {
|
||||
let stopped = false;
|
||||
const beat = async (): Promise<void> => {
|
||||
if (stopped) return;
|
||||
try {
|
||||
await api.heartbeat(runId, runnerId, leaseMs);
|
||||
} catch {
|
||||
// The next manager call will surface lease or run-terminal details.
|
||||
}
|
||||
};
|
||||
const timer = setInterval(() => { void beat(); }, Math.max(1_000, Math.floor(leaseMs / 3)));
|
||||
return () => {
|
||||
stopped = true;
|
||||
clearInterval(timer);
|
||||
};
|
||||
}
|
||||
|
||||
function annotateCommandEvent(event: BackendEvent, commandId: string, attemptId: string, runnerId: string): BackendEvent {
|
||||
return { ...event, payload: { ...event.payload, commandId, attemptId, runnerId } };
|
||||
}
|
||||
|
||||
async function reportCommandFailure(api: RunnerManagerApi, runId: string, commandId: string, runner: RunnerRecord, attemptId: string, failure: { terminalStatus: TerminalStatus; failureKind: FailureKind; message: string }, phase: string): Promise<CommandExecutionResult> {
|
||||
await api.appendEvent(runId, { type: "error", payload: { failureKind: failure.failureKind, message: failure.message, phase, commandId, attemptId, runnerId: runner.id } });
|
||||
await api.appendEvent(runId, { type: "terminal_status", payload: { terminalStatus: failure.terminalStatus, failureKind: failure.failureKind, message: failure.message, commandId, attemptId, runnerId: runner.id } });
|
||||
await api.reportCommandStatus(commandId, { terminalStatus: failure.terminalStatus, failureKind: failure.failureKind, failureMessage: failure.message });
|
||||
return { commandId, terminalStatus: failure.terminalStatus, failureKind: failure.failureKind } as CommandExecutionResult;
|
||||
}
|
||||
|
||||
async function reportCancelled(api: RunnerManagerApi, runId: string, commandId: string, runner: RunnerRecord, attemptId: string, message: string): Promise<CommandExecutionResult> {
|
||||
await api.reportCommandStatus(commandId, { terminalStatus: "cancelled", failureKind: "cancelled", failureMessage: message });
|
||||
await api.appendEvent(runId, { type: "backend_status", payload: { phase: "turn-cancelled", commandId, attemptId, runnerId: runner.id, failureKind: "cancelled", message } });
|
||||
await api.appendEvent(runId, { type: "terminal_status", payload: { terminalStatus: "cancelled", failureKind: "cancelled", message, commandId, attemptId, runnerId: runner.id } });
|
||||
return { commandId, terminalStatus: "cancelled", failureKind: "cancelled" };
|
||||
}
|
||||
|
||||
function noPendingResult(runner: RunnerRecord, claimed: RunRecord, commandResults: CommandExecutionResult[], polledCommands: number, stopped: string): JsonRecord {
|
||||
if (commandResults.length > 0) {
|
||||
const last = commandResults.at(-1)!;
|
||||
return { runner, claimed, terminalStatus: last.terminalStatus, failureKind: last.failureKind, commandsProcessed: commandResults.length, commandResults, polledCommands, stopped };
|
||||
}
|
||||
return { runner, claimed, terminalStatus: "blocked", failureKind: "schema-invalid", commandsProcessed: 0, commandResults, polledCommands, stopped };
|
||||
}
|
||||
|
||||
function sleep(ms: number): Promise<void> {
|
||||
return new Promise((resolve) => setTimeout(resolve, ms));
|
||||
}
|
||||
|
||||
@@ -14,28 +14,29 @@ const selfTest: SelfTestCase = async (context) => {
|
||||
try {
|
||||
const client = new ManagerClient(server.baseUrl);
|
||||
const happy = await createRunWithCommand(client, context, "hello", "selftest-turn", 15_000);
|
||||
const result = await runOnce({ managerUrl: server.baseUrl, runId: happy.runId, codexCommand: context.fakeCodexCommand, codexArgs: context.fakeCodexArgs, codexHome: context.codexHome, env: { CODEX_HOME: context.codexHome } });
|
||||
const result = await runOnce({ managerUrl: server.baseUrl, runId: happy.runId, codexCommand: context.fakeCodexCommand, codexArgs: context.fakeCodexArgs, codexHome: context.codexHome, env: { CODEX_HOME: context.codexHome }, oneShot: true });
|
||||
assert.equal(result.terminalStatus, "completed");
|
||||
assert.equal(typeof (result.runner as { id?: unknown }).id, "string");
|
||||
const events = await client.get(`/api/v1/runs/${happy.runId}/events?afterSeq=0&limit=100`) as { items?: Array<{ type: string; payload: unknown }> };
|
||||
assert.ok(events.items?.some((event) => event.type === "assistant_message"));
|
||||
assert.ok(events.items?.some((event) => event.type === "backend_status" && JSON.stringify(event.payload).includes("run-claimed")));
|
||||
assertNoSecretLeak(events);
|
||||
const finalRun = await client.get(`/api/v1/runs/${happy.runId}`) as { terminalStatus?: string };
|
||||
assert.equal(finalRun.terminalStatus, "completed");
|
||||
const finalRun = await client.get(`/api/v1/runs/${happy.runId}`) as { terminalStatus?: string | null; status?: string };
|
||||
assert.equal(finalRun.terminalStatus, null);
|
||||
assert.equal(finalRun.status, "claimed");
|
||||
const finalCommand = await client.get(`/api/v1/runs/${happy.runId}/commands/${happy.commandId}`) as { state?: string };
|
||||
assert.equal(finalCommand.state, "completed");
|
||||
|
||||
const projectedHome = path.join(context.tmp, "runtime-codex-home");
|
||||
const projected = await createRunWithCommand(client, { workspace: context.workspace, codexHome: projectedHome }, "hello projected", "selftest-projected-codex-home", 15_000);
|
||||
const projectedResult = await runOnce({ managerUrl: server.baseUrl, runId: projected.runId, codexCommand: context.fakeCodexCommand, codexArgs: context.fakeCodexArgs, codexHome: projectedHome, env: { CODEX_HOME: projectedHome, AGENTRUN_CODEX_SECRET_HOME: context.codexHome } });
|
||||
const projectedResult = await runOnce({ managerUrl: server.baseUrl, runId: projected.runId, codexCommand: context.fakeCodexCommand, codexArgs: context.fakeCodexArgs, codexHome: projectedHome, env: { CODEX_HOME: projectedHome, AGENTRUN_CODEX_SECRET_HOME: context.codexHome }, oneShot: true });
|
||||
assert.equal(projectedResult.terminalStatus, "completed");
|
||||
await access(path.join(projectedHome, "auth.json"));
|
||||
await access(path.join(projectedHome, "config.toml"));
|
||||
|
||||
const deepseekHome = path.join(context.tmp, "runtime-deepseek-home");
|
||||
const deepseek = await createRunWithCommand(client, { ...context, backendProfile: "deepseek" }, "hello deepseek", "selftest-deepseek-turn", 15_000);
|
||||
const deepseekResult = await runOnce({ managerUrl: server.baseUrl, runId: deepseek.runId, backendProfile: "deepseek", codexCommand: context.fakeCodexCommand, codexArgs: context.fakeCodexArgs, codexHome: deepseekHome, env: { CODEX_HOME: deepseekHome, AGENTRUN_CODEX_SECRET_HOME: context.deepseekHome } });
|
||||
const deepseekResult = await runOnce({ managerUrl: server.baseUrl, runId: deepseek.runId, backendProfile: "deepseek", codexCommand: context.fakeCodexCommand, codexArgs: context.fakeCodexArgs, codexHome: deepseekHome, env: { CODEX_HOME: deepseekHome, AGENTRUN_CODEX_SECRET_HOME: context.deepseekHome }, oneShot: true });
|
||||
assert.equal(deepseekResult.terminalStatus, "completed");
|
||||
await access(path.join(deepseekHome, "auth.json"));
|
||||
await access(path.join(deepseekHome, "config.toml"));
|
||||
@@ -49,12 +50,12 @@ const selfTest: SelfTestCase = async (context) => {
|
||||
);
|
||||
|
||||
const configModel = await createRunWithCommand(client, context, "hello config model", "selftest-config-model", 15_000);
|
||||
const configModelResult = await runOnce({ managerUrl: server.baseUrl, runId: configModel.runId, codexCommand: context.fakeCodexCommand, codexArgs: context.fakeCodexArgs, codexHome: context.codexHome, env: { CODEX_HOME: context.codexHome, AGENTRUN_FAKE_CODEX_MODE: "reject-unexpected-model" } });
|
||||
const configModelResult = await runOnce({ managerUrl: server.baseUrl, runId: configModel.runId, codexCommand: context.fakeCodexCommand, codexArgs: context.fakeCodexArgs, codexHome: context.codexHome, env: { CODEX_HOME: context.codexHome, AGENTRUN_FAKE_CODEX_MODE: "reject-unexpected-model" }, oneShot: true });
|
||||
assert.equal(configModelResult.terminalStatus, "completed", "unspecified model should be omitted so Codex config.toml remains authoritative");
|
||||
|
||||
const explicitModel = await createRunWithCommand(client, context, "hello explicit model placeholder", "selftest-explicit-model-placeholder", 15_000);
|
||||
const explicitCommand = await client.post(`/api/v1/runs/${explicitModel.runId}/commands`, { type: "turn", payload: { prompt: "hello explicit model", model: "gpt-5.5" }, idempotencyKey: "selftest-explicit-model-command" }) as { id: string };
|
||||
const explicitModelResult = await runOnce({ managerUrl: server.baseUrl, runId: explicitModel.runId, commandId: explicitCommand.id, codexCommand: context.fakeCodexCommand, codexArgs: context.fakeCodexArgs, codexHome: context.codexHome, env: { CODEX_HOME: context.codexHome, AGENTRUN_FAKE_CODEX_MODE: "require-explicit-model" } });
|
||||
const explicitModelResult = await runOnce({ managerUrl: server.baseUrl, runId: explicitModel.runId, commandId: explicitCommand.id, codexCommand: context.fakeCodexCommand, codexArgs: context.fakeCodexArgs, codexHome: context.codexHome, env: { CODEX_HOME: context.codexHome, AGENTRUN_FAKE_CODEX_MODE: "require-explicit-model" }, oneShot: true });
|
||||
assert.equal(explicitModelResult.terminalStatus, "completed", "explicit command payload model should still be forwarded");
|
||||
|
||||
await runFailureCase({ client, managerUrl: server.baseUrl, context, mode: "missing-turn-result", expectedStatus: "failed", expectedFailureKind: "backend-response-invalid" });
|
||||
@@ -83,6 +84,7 @@ async function runFailureCase(options: { client: ManagerClient; managerUrl: stri
|
||||
codexArgs: options.context.fakeCodexArgs,
|
||||
codexHome: options.context.codexHome,
|
||||
env: { CODEX_HOME: options.context.codexHome, AGENTRUN_FAKE_CODEX_MODE: options.mode },
|
||||
oneShot: true,
|
||||
}) as JsonRecord;
|
||||
assert.equal(result.terminalStatus, options.expectedStatus, options.mode);
|
||||
assert.equal(result.failureKind, options.expectedFailureKind, options.mode);
|
||||
@@ -113,12 +115,15 @@ async function runSecretFailureCase(options: { client: ManagerClient; managerUrl
|
||||
codexArgs: options.context.fakeCodexArgs,
|
||||
codexHome: path.join(options.context.tmp, "missing-codex-home"),
|
||||
env: { CODEX_HOME: path.join(options.context.tmp, "missing-codex-home") },
|
||||
oneShot: true,
|
||||
}) as JsonRecord;
|
||||
assert.equal(result.terminalStatus, "blocked", "secret unavailable");
|
||||
assert.equal(result.failureKind, "secret-unavailable", "secret unavailable");
|
||||
const run = await options.client.get(`/api/v1/runs/${item.runId}`) as { status?: string; failureKind?: string };
|
||||
assert.equal(run.status, "blocked", "secret unavailable");
|
||||
assert.equal(run.failureKind, "secret-unavailable", "secret unavailable");
|
||||
const command = await options.client.get(`/api/v1/runs/${item.runId}/commands/${item.commandId}`) as { state?: string };
|
||||
assert.equal(command.state, "failed", "secret unavailable command state");
|
||||
const envelope = await options.client.get(`/api/v1/runs/${item.runId}/commands/${item.commandId}/result`) as JsonRecord;
|
||||
assert.equal(envelope.terminalStatus, "failed", "secret unavailable result terminal");
|
||||
assert.equal(envelope.failureKind, "secret-unavailable", "secret unavailable result kind");
|
||||
}
|
||||
|
||||
async function runSpawnFailureCase(options: { client: ManagerClient; managerUrl: string; context: SelfTestContext }): Promise<void> {
|
||||
@@ -130,6 +135,7 @@ async function runSpawnFailureCase(options: { client: ManagerClient; managerUrl:
|
||||
codexArgs: [],
|
||||
codexHome: options.context.codexHome,
|
||||
env: { CODEX_HOME: options.context.codexHome },
|
||||
oneShot: true,
|
||||
}) as JsonRecord;
|
||||
assert.equal(result.terminalStatus, "failed", "spawn failure");
|
||||
assert.equal(result.failureKind, "backend-spawn-failed", "spawn failure");
|
||||
|
||||
@@ -69,7 +69,7 @@ console.log(JSON.stringify({ apiVersion: manifest.apiVersion, kind: manifest.kin
|
||||
);
|
||||
|
||||
const sessionRun = await createHwlabRun(client, context, bundle, "hwlab-session-resume", "hello session", "hwlab-command-session");
|
||||
const runResult = await runOnce({ managerUrl: server.baseUrl, runId: sessionRun.runId, codexCommand: context.fakeCodexCommand, codexArgs: context.fakeCodexArgs, codexHome: context.codexHome, env: { CODEX_HOME: context.codexHome, AGENTRUN_WORKSPACE_ROOT: path.join(context.tmp, "workspaces") } });
|
||||
const runResult = await runOnce({ managerUrl: server.baseUrl, runId: sessionRun.runId, codexCommand: context.fakeCodexCommand, codexArgs: context.fakeCodexArgs, codexHome: context.codexHome, env: { CODEX_HOME: context.codexHome, AGENTRUN_WORKSPACE_ROOT: path.join(context.tmp, "workspaces") }, oneShot: true });
|
||||
assert.equal(runResult.terminalStatus, "completed");
|
||||
const session = await store.getSession("hwlab-session-resume");
|
||||
assert.equal(session?.threadId, "thread_selftest_1");
|
||||
@@ -84,14 +84,29 @@ console.log(JSON.stringify({ apiVersion: manifest.apiVersion, kind: manifest.kin
|
||||
const resumedRun = await client.get(`/api/v1/runs/${resumed.runId}`) as JsonRecord;
|
||||
assert.equal(((resumedRun.sessionRef as JsonRecord).threadId), "thread_selftest_1");
|
||||
|
||||
const multiTurn = await createHwlabRun(client, context, bundle, "hwlab-session-multiturn", "hello first turn", "hwlab-command-multiturn-1");
|
||||
const multiturnRunner = runOnce({ managerUrl: server.baseUrl, runId: multiTurn.runId, commandId: multiTurn.commandId, codexCommand: context.fakeCodexCommand, codexArgs: context.fakeCodexArgs, codexHome: context.codexHome, env: { CODEX_HOME: context.codexHome, AGENTRUN_WORKSPACE_ROOT: path.join(context.tmp, "workspaces-multiturn") }, idleTimeoutMs: 500, pollIntervalMs: 50 });
|
||||
await waitForCommandState(client, multiTurn.runId, multiTurn.commandId, "completed");
|
||||
const secondCommand = await client.post(`/api/v1/runs/${multiTurn.runId}/commands`, { type: "turn", payload: { prompt: "hello second turn", traceId: "hwlab-command-multiturn-2" }, idempotencyKey: "hwlab-command-multiturn-2" }) as { id: string };
|
||||
await waitForCommandState(client, multiTurn.runId, secondCommand.id, "completed");
|
||||
const multiturnResult = await multiturnRunner as JsonRecord;
|
||||
assert.equal(multiturnResult.commandsProcessed, 2);
|
||||
const multiEventsResponse = await client.get(`/api/v1/runs/${multiTurn.runId}/events?afterSeq=0&limit=200`) as { items?: Array<{ type?: string; payload?: JsonRecord }> };
|
||||
const multiEvents = multiEventsResponse.items ?? [];
|
||||
assert.equal(multiEvents.filter((event) => event.type === "backend_status" && event.payload?.phase === "resource-bundle-materialized").length, 1);
|
||||
assert.equal(multiEvents.filter((event) => event.type === "backend_status" && event.payload?.phase === "command-terminal").length, 2);
|
||||
const secondEnvelope = await client.get(`/api/v1/runs/${multiTurn.runId}/commands/${secondCommand.id}/result`) as JsonRecord;
|
||||
assert.equal(secondEnvelope.terminalStatus, "completed");
|
||||
assert.equal(secondEnvelope.reply, "fake codex stdio reply");
|
||||
|
||||
const runningCancel = await createHwlabRun(client, context, bundle, "hwlab-session-cancel-running", "cancel running", "hwlab-command-cancel-running", 10_000);
|
||||
const running = runOnce({ managerUrl: server.baseUrl, runId: runningCancel.runId, codexCommand: context.fakeCodexCommand, codexArgs: context.fakeCodexArgs, codexHome: context.codexHome, env: { CODEX_HOME: context.codexHome, AGENTRUN_FAKE_CODEX_MODE: "missing-terminal", AGENTRUN_WORKSPACE_ROOT: path.join(context.tmp, "workspaces-running-cancel") } });
|
||||
const running = runOnce({ managerUrl: server.baseUrl, runId: runningCancel.runId, codexCommand: context.fakeCodexCommand, codexArgs: context.fakeCodexArgs, codexHome: context.codexHome, env: { CODEX_HOME: context.codexHome, AGENTRUN_FAKE_CODEX_MODE: "missing-terminal", AGENTRUN_WORKSPACE_ROOT: path.join(context.tmp, "workspaces-running-cancel") }, oneShot: true });
|
||||
await waitForCommandState(client, runningCancel.runId, runningCancel.commandId, "acknowledged");
|
||||
await client.post(`/api/v1/commands/${runningCancel.commandId}/cancel`, { reason: "self-test running cancel" });
|
||||
const runningResult = await running;
|
||||
assert.equal(runningResult.terminalStatus, "cancelled");
|
||||
|
||||
return { name: "hwlab-manual-dispatch", tests: ["runner-job-idempotency", "pending-cancel", "result-envelope", "session-ref-resume", "resource-bundle-materialization", "running-cancel"] };
|
||||
return { name: "hwlab-manual-dispatch", tests: ["runner-job-idempotency", "pending-cancel", "result-envelope", "session-ref-resume", "resource-bundle-materialization", "same-run-runner-multiturn", "running-cancel"] };
|
||||
} finally {
|
||||
await new Promise<void>((resolve) => server.server.close(() => resolve()));
|
||||
}
|
||||
|
||||
@@ -70,7 +70,7 @@ async function assertEventContractAndCompletedSemantics(client: ManagerClient, c
|
||||
const happy = await createRunWithCommand(client, context, "hello event contract", "selftest-event-contract", 15_000);
|
||||
await client.post(`/api/v1/runs/${happy.runId}/events`, { type: "tool_call", payload: { method: "selftest/tool", item: { command: "echo ok" } } });
|
||||
await client.post(`/api/v1/runs/${happy.runId}/events`, { type: "diff", payload: { filesChanged: 1, summary: "selftest diff" } });
|
||||
const result = await runOnce({ managerUrl, runId: happy.runId, codexCommand: context.fakeCodexCommand, codexArgs: context.fakeCodexArgs, codexHome: context.codexHome, env: { CODEX_HOME: context.codexHome } });
|
||||
const result = await runOnce({ managerUrl, runId: happy.runId, codexCommand: context.fakeCodexCommand, codexArgs: context.fakeCodexArgs, codexHome: context.codexHome, env: { CODEX_HOME: context.codexHome }, oneShot: true });
|
||||
assert.equal(result.terminalStatus, "completed");
|
||||
const eventsResponse = await client.get(`/api/v1/runs/${happy.runId}/events?afterSeq=0&limit=100`) as { items?: RunEvent[] };
|
||||
const events = eventsResponse.items ?? [];
|
||||
@@ -81,7 +81,7 @@ async function assertEventContractAndCompletedSemantics(client: ManagerClient, c
|
||||
const envelope = await client.get(`/api/v1/runs/${happy.runId}/commands/${happy.commandId}/result`) as JsonRecord;
|
||||
assert.equal(envelope.completed, true);
|
||||
assert.equal(envelope.terminalStatus, "completed");
|
||||
assert.equal(envelope.terminalSource, "terminal_status-event");
|
||||
assert.equal(envelope.terminalSource, "command-record");
|
||||
assertNoSecretLeak({ eventsResponse, envelope });
|
||||
|
||||
const partial = await createRunWithCommand(client, context, "partial should not complete", "selftest-partial-not-completed", 15_000);
|
||||
@@ -132,7 +132,7 @@ async function assertResourceBundleFailure(client: ManagerClient, context: SelfT
|
||||
resourceBundleRef: { kind: "git", repoUrl: repo.repoUrl, commitId: "0000000000000000000000000000000000000000", submodules: false, lfs: false },
|
||||
}) as { id: string };
|
||||
const command = await client.post(`/api/v1/runs/${run.id}/commands`, { type: "turn", payload: { prompt: "bad bundle" }, idempotencyKey: "selftest-bad-bundle" }) as { id: string };
|
||||
const result = await runOnce({ managerUrl, runId: run.id, commandId: command.id, codexCommand: context.fakeCodexCommand, codexArgs: context.fakeCodexArgs, codexHome: context.codexHome, env: { CODEX_HOME: context.codexHome, AGENTRUN_WORKSPACE_ROOT: path.join(context.tmp, "bad-bundle-workspaces") } }) as JsonRecord;
|
||||
const result = await runOnce({ managerUrl, runId: run.id, commandId: command.id, codexCommand: context.fakeCodexCommand, codexArgs: context.fakeCodexArgs, codexHome: context.codexHome, env: { CODEX_HOME: context.codexHome, AGENTRUN_WORKSPACE_ROOT: path.join(context.tmp, "bad-bundle-workspaces") }, oneShot: true }) as JsonRecord;
|
||||
assert.equal(result.terminalStatus, "failed");
|
||||
assert.equal(result.failureKind, "infra-failed");
|
||||
const envelope = await client.get(`/api/v1/runs/${run.id}/commands/${command.id}/result`) as JsonRecord;
|
||||
|
||||
Reference in New Issue
Block a user