Merge pull request #204 from pikasTech/fix/203-codex-stdio-otel

[OTel] 追穿 codex app-server lifecycle 与 codex-stdio turn 状态
This commit is contained in:
Lyon
2026-06-20 15:33:45 +08:00
committed by GitHub
9 changed files with 423 additions and 28 deletions
+13 -1
View File
@@ -1,4 +1,4 @@
import type { BackendEvent, BackendTurnResult, CommandRecord, InitialPromptAssembly, RunRecord } from "../common/types.js";
import type { BackendEvent, BackendTurnResult, CommandRecord, InitialPromptAssembly, JsonRecord, RunRecord } from "../common/types.js";
import { CodexStdioBackendSession, runCodexStdioTurn, type CodexStdioTurnOptions } from "./codex-stdio.js";
import { backendProfileSpec } from "../common/backend-profiles.js";
@@ -16,6 +16,17 @@ export interface BackendAdapterOptions {
workspacePath?: string;
abortSignal?: AbortSignal;
initialPrompt?: InitialPromptAssembly;
otelContext?: {
run: RunRecord;
command: CommandRecord;
attemptId?: string | null;
runnerId?: string | null;
runnerJobId?: string | null;
jobName?: string | null;
podName?: string | null;
sourceCommit?: string | null;
logPath?: string | null;
} & JsonRecord;
onEvent?: (event: BackendEvent) => void | Promise<void>;
onActiveTurn?: (control: BackendActiveTurnControl) => void | (() => void);
env?: NodeJS.ProcessEnv;
@@ -64,6 +75,7 @@ export function backendTurnOptions(run: RunRecord, command: CommandRecord, optio
if (options.codexArgs) turnOptions.args = options.codexArgs;
if (options.env) turnOptions.env = options.env;
if (options.initialPrompt) turnOptions.initialPrompt = options.initialPrompt;
if (options.otelContext) turnOptions.otelContext = options.otelContext;
if (options.codexHome) turnOptions.codexHome = options.codexHome;
if (options.abortSignal) turnOptions.abortSignal = options.abortSignal;
if (options.onEvent) turnOptions.onEvent = options.onEvent;
+277 -15
View File
@@ -4,10 +4,11 @@ import { accessSync, constants as fsConstants, readdirSync, readFileSync } from
import { chmod, copyFile, mkdir } from "node:fs/promises";
import path from "node:path";
import * as readline from "node:readline";
import type { BackendEvent, BackendProfile, BackendTurnResult, FailureKind, InitialPromptAssembly, JsonRecord, JsonValue, TerminalStatus } from "../common/types.js";
import type { BackendEvent, BackendProfile, BackendTurnResult, CommandRecord, FailureKind, InitialPromptAssembly, JsonRecord, JsonValue, RunRecord, TerminalStatus } from "../common/types.js";
import { redactJson, redactText } from "../common/redaction.js";
import { backendProfileSpec } from "../common/backend-profiles.js";
import { boundedTextSummary, commandOutputPayload } from "../common/output.js";
import { emitAgentRunOtelSpan } from "../common/otel-trace.js";
const codexProtocol = "codex-app-server-jsonrpc-stdio";
const defaultCodexArgs = ["app-server", "--listen", "stdio://"];
@@ -16,6 +17,7 @@ const stderrEventChars = 4_000;
const requestTimeoutCapMs = 30_000;
const assistantDeltaProgressMinChars = 500;
const assistantDeltaProgressLimitChars = 1_200;
const defaultIdleWarningMs = 8_000;
const childEnvSummaryKeys = [
"CODEX_HOME",
@@ -53,11 +55,31 @@ export interface CodexStdioTurnOptions {
env?: NodeJS.ProcessEnv;
codexHome?: string;
initialPrompt?: InitialPromptAssembly;
otelContext?: CodexStdioOtelContext;
abortSignal?: AbortSignal;
onEvent?: (event: BackendEvent) => void | Promise<void>;
onActiveTurn?: (control: CodexActiveTurnControl) => void | (() => void);
}
export interface CodexStdioOtelContext extends JsonRecord {
run: RunRecord;
command: CommandRecord;
attemptId?: string | null;
runnerId?: string | null;
runnerJobId?: string | null;
jobName?: string | null;
podName?: string | null;
sourceCommit?: string | null;
logPath?: string | null;
}
interface CodexLifecycleOtelContext {
env: NodeJS.ProcessEnv;
run: RunRecord;
command: CommandRecord;
attributes: JsonRecord;
}
export interface CodexActiveTurnControl {
threadId: string;
turnId: string;
@@ -331,6 +353,7 @@ export async function runCodexStdioTurn(options: CodexStdioTurnOptions): Promise
export class CodexStdioBackendSession {
private client: CodexStdioClient | null = null;
private clientKey: string | null = null;
private lifecycleOtel: CodexLifecycleOtelContext | null = null;
async runTurn(options: CodexStdioTurnOptions): Promise<BackendTurnResult> {
return await runCodexStdioTurnWithSession(options, this);
@@ -343,13 +366,17 @@ export class CodexStdioBackendSession {
this.clientKey = null;
client.stop();
const closeInfo = await client.closedPromise;
emitCodexOtelSpanFromLifecycle("codex_app_server.exit", this.lifecycleOtel, { status: closeInfo.failureKind ? "error" : "ok", error: closeInfo.message ?? closeInfo.failureKind ?? undefined, attributes: { ...closeEventAttributes(closeInfo), failureKind: closeInfo.failureKind } });
this.lifecycleOtel = null;
return [{ type: "backend_status", payload: { phase: "codex-app-server-closed", appServerExit: closeEvent(closeInfo) } }];
}
async getClient(options: CodexStdioTurnOptions, env: NodeJS.ProcessEnv, emitEvent: (event: BackendEvent) => void): Promise<CodexStdioClient> {
const key = codexClientKey(options, env);
this.lifecycleOtel = codexLifecycleOtelContext(options, env);
if (this.client && !this.client.isClosed && this.clientKey === key) {
emitEvent({ type: "backend_status", payload: { phase: "codex-app-server:reused", ...backendMetadata(options), protocol: codexProtocol } });
emitCodexOtelSpan("codex_app_server.reused", options, env, { command: options.command ?? "codex", argsFingerprint: shortHash(JSON.stringify(options.args ?? defaultCodexArgs)) });
return this.client;
}
const closeEvents = await this.close();
@@ -364,6 +391,8 @@ export class CodexStdioBackendSession {
config: codexConfigSummary(resolveCodexHome(options), options.backendProfile ?? "codex"),
},
});
const appServerStartMs = Date.now();
emitCodexOtelSpan("codex_app_server.starting", options, env, { command: options.command ?? "codex", argsFingerprint: shortHash(JSON.stringify(options.args ?? defaultCodexArgs)), cwd: pathSummary(options.cwd), codexHome: pathSummary(resolveCodexHome(options)) }, { startTimeMs: appServerStartMs });
const clientOptions: ConstructorParameters<typeof CodexStdioClient>[0] = {
cwd: options.cwd,
env,
@@ -371,14 +400,20 @@ export class CodexStdioBackendSession {
};
if (options.command) clientOptions.command = options.command;
if (options.args) clientOptions.args = options.args;
this.client = new CodexStdioClient(clientOptions);
this.clientKey = key;
const requestTimeoutMs = Math.min(positiveTimeout(options.timeoutMs), requestTimeoutCapMs);
const initializeResult = requireResponseRecord(await this.client.request("initialize", { clientInfo: { name: "agentrun", title: "AgentRun", version: "0.1.0" }, capabilities: { experimentalApi: true } }, requestTimeoutMs), "initialize");
validateInitializeResponse(initializeResult);
this.client.notify("initialized", {});
emitEvent({ type: "backend_status", payload: { phase: "initialize:completed", ...backendMetadata(options), protocol: codexProtocol } });
return this.client;
try {
this.client = new CodexStdioClient(clientOptions);
this.clientKey = key;
const requestTimeoutMs = Math.min(positiveTimeout(options.timeoutMs), requestTimeoutCapMs);
const initializeResult = requireResponseRecord(await this.client.request("initialize", { clientInfo: { name: "agentrun", title: "AgentRun", version: "0.1.0" }, capabilities: { experimentalApi: true } }, requestTimeoutMs), "initialize");
validateInitializeResponse(initializeResult);
this.client.notify("initialized", {});
emitEvent({ type: "backend_status", payload: { phase: "initialize:completed", ...backendMetadata(options), protocol: codexProtocol } });
emitCodexOtelSpan("codex_app_server.started", options, env, { command: options.command ?? "codex", argsFingerprint: shortHash(JSON.stringify(options.args ?? defaultCodexArgs)) }, { startTimeMs: appServerStartMs, endTimeMs: Date.now() });
return this.client;
} catch (error) {
emitCodexOtelSpan("codex_app_server.exit", options, env, { phase: "app-server-start", command: options.command ?? "codex", failureKind: normalizeFailure(error).failureKind }, { startTimeMs: appServerStartMs, endTimeMs: Date.now(), status: "error", error });
throw error;
}
}
private notificationHandlers = new Set<(message: JsonRecord) => void>();
@@ -423,6 +458,11 @@ async function runCodexStdioTurnWithSession(options: CodexStdioTurnOptions, sess
const assistantDeltaProgress = createAssistantDeltaProgressState();
const completedAssistantMessages: CompletedAssistantMessage[] = [];
const suppressedNotifications = createSuppressedNotificationSummary();
let waitingFor = "codex-app-server";
let lastNotificationMethod: string | null = null;
let lastActivityAt = Date.now();
let lastToolCall: JsonRecord | null = null;
let missingTerminalAfterToolReported = false;
let threadId: string | undefined = options.threadId;
let turnId: string | undefined;
let terminal: { status: TerminalStatus; failureKind: FailureKind | null; message: string | null } | null = null;
@@ -491,29 +531,56 @@ async function runCodexStdioTurnWithSession(options: CodexStdioTurnOptions, sess
};
options.abortSignal?.addEventListener("abort", abortTurn, { once: true });
const turnIdleTimeoutMs = positiveTimeout(options.timeoutMs);
const idleWarningMs = codexIdleWarningMs(env, turnIdleTimeoutMs);
let idleTimeout: NodeJS.Timeout | null = null;
let idleWarningTimeout: NodeJS.Timeout | null = null;
const scheduleIdleWarning = (): void => {
if (idleWarningTimeout) clearTimeout(idleWarningTimeout);
idleWarningTimeout = setTimeout(() => {
if (terminal) return;
const idleMs = Math.max(0, Date.now() - lastActivityAt);
const attrs = { waitingFor, idleMs, lastNotificationMethod, threadId: threadId ?? null, turnId: turnId ?? null, terminalStatus: null };
emitCodexOtelSpan("codex_stdio.idle_warning", options, env, attrs);
if (lastToolCall && !missingTerminalAfterToolReported) {
missingTerminalAfterToolReported = true;
emitCodexOtelSpan("codex_stdio.missing_terminal_after_tool", options, env, { ...attrs, lastToolCall });
}
}, idleWarningMs);
idleWarningTimeout.unref?.();
};
const refreshTurnActivity = (): void => {
if (terminal) return;
lastActivityAt = Date.now();
scheduleIdleWarning();
if (idleTimeout) clearTimeout(idleTimeout);
idleTimeout = setTimeout(() => {
if (terminal) return;
terminal = { status: "failed", failureKind: "backend-timeout", message: `codex stdio turn idle timed out after ${turnIdleTimeoutMs}ms without activity` };
emitEvent({ type: "error", payload: { failureKind: terminal.failureKind, message: terminal.message, phase: "turn:idle-timeout" } });
emitCodexOtelSpan("codex_stdio.idle_timeout", options, env, { waitingFor, idleMs: Math.max(0, Date.now() - lastActivityAt), lastNotificationMethod, threadId: threadId ?? null, turnId: turnId ?? null, terminalStatus: terminal.status, failureKind: terminal.failureKind }, { status: "error", error: terminal.message });
beginInterruptAndStop("idle timeout", "turn:idle-timeout");
terminalResolve();
}, turnIdleTimeoutMs);
idleTimeout.unref?.();
};
const stopTurnIdleTimeout = (): void => {
if (!idleTimeout) return;
clearTimeout(idleTimeout);
idleTimeout = null;
if (idleWarningTimeout) clearTimeout(idleWarningTimeout);
idleWarningTimeout = null;
};
refreshTurnActivity();
const stopNotifications = session.addNotificationHandler((message) => {
refreshTurnActivity();
lastNotificationMethod = typeof message.method === "string" ? message.method : "unknown";
emitCodexNotificationOtel(options, env, message, { threadId: threadId ?? null, turnId: turnId ?? null, waitingFor });
const normalized = normalizeCodexNotification(message, suppressedNotifications);
if (normalized.threadId) threadId = normalized.threadId;
if (normalized.turnId) turnId = normalized.turnId;
waitingFor = waitingForAfterNotification(message, normalized.terminal !== undefined);
const toolSummary = toolCallSummaryFromNotification(message);
if (toolSummary?.status === "completed" || toolSummary?.status === "failed") lastToolCall = toolSummary;
exposeActiveTurn(normalized.turnId ? "turn-notification" : "notification");
emitEvents(normalized.events);
if (normalized.assistantDelta) {
@@ -534,9 +601,19 @@ async function runCodexStdioTurnWithSession(options: CodexStdioTurnOptions, sess
client = await session.getClient(options, env, emitEvent);
const startThread = async (phasePrefix = "thread/start"): Promise<string> => {
const response = requireResponseRecord(await client!.request("thread/start", withOptionalModel({ cwd: options.cwd, approvalPolicy: options.approvalPolicy, sandbox: options.sandbox, serviceName: "agentrun" }, options.model), requestTimeoutMs), "thread/start");
waitingFor = "thread/start";
const startedAt = Date.now();
emitCodexOtelSpan("codex_stdio.thread_start.start", options, env, { waitingFor, requestedThreadId: null }, { startTimeMs: startedAt });
let response: JsonRecord;
try {
response = requireResponseRecord(await client!.request("thread/start", withOptionalModel({ cwd: options.cwd, approvalPolicy: options.approvalPolicy, sandbox: options.sandbox, serviceName: "agentrun" }, options.model), requestTimeoutMs), "thread/start");
} catch (error) {
emitCodexOtelSpan("codex_stdio.thread_start.failed", options, env, { waitingFor, failureKind: normalizeFailure(error).failureKind }, { startTimeMs: startedAt, endTimeMs: Date.now(), status: "error", error });
throw error;
}
const nextThreadId = requireNestedId(response, "thread/start", "thread");
emitEvent({ type: "backend_status", payload: { phase: `${phasePrefix}:completed`, threadId: nextThreadId } });
emitCodexOtelSpan("codex_stdio.thread_start.completed", options, env, { waitingFor, threadId: nextThreadId }, { startTimeMs: startedAt, endTimeMs: Date.now() });
return nextThreadId;
};
@@ -549,12 +626,17 @@ async function runCodexStdioTurnWithSession(options: CodexStdioTurnOptions, sess
emitEvent({ type: "backend_status", payload: { phase: "codex-rollout-storage-mounted", pvcName: sessionPvcName, pvcNamespace: sessionPvcNamespace, mountPath: sessionPvcMountPath, codexRolloutSubdir: codexRolloutSubdirEnv ?? "sessions", valuesPrinted: false } });
}
if (options.threadId) {
waitingFor = "thread/resume";
const startedAt = Date.now();
emitCodexOtelSpan("codex_stdio.thread_resume.start", options, env, { waitingFor, requestedThreadId: options.threadId }, { startTimeMs: startedAt });
try {
const threadResponse = requireResponseRecord(await client.request("thread/resume", withOptionalModel({ threadId: options.threadId, cwd: options.cwd, approvalPolicy: options.approvalPolicy, sandbox: options.sandbox }, options.model), requestTimeoutMs), "thread/resume");
threadId = requireNestedId(threadResponse, "thread/resume", "thread");
emitEvent({ type: "backend_status", payload: { phase: "thread/resume:completed", threadId } });
emitCodexOtelSpan("codex_stdio.thread_resume.completed", options, env, { waitingFor, requestedThreadId: options.threadId, threadId }, { startTimeMs: startedAt, endTimeMs: Date.now() });
} catch (error) {
const failure = normalizeFailure(error);
emitCodexOtelSpan("codex_stdio.thread_resume.failed", options, env, { waitingFor, requestedThreadId: options.threadId, failureKind: failure.failureKind }, { startTimeMs: startedAt, endTimeMs: Date.now(), status: "error", error });
if (sessionPvcName && isNoRolloutFoundMessage(failure.message)) {
throw new CodexStdioFailure(
"session-store-evicted",
@@ -571,17 +653,34 @@ async function runCodexStdioTurnWithSession(options: CodexStdioTurnOptions, sess
const promptInjection = initialPromptInjection(options.initialPrompt, willResumeThread);
emitEvent({ type: "backend_status", payload: { phase: "initial-prompt-assembly", initialPromptInjected: promptInjection.injected, reason: promptInjection.reason, initialPrompt: options.initialPrompt?.summary ?? { available: false, valuesPrinted: false }, valuesPrinted: false } });
const turnStart = await Promise.race([
client.request("turn/start", withOptionalModel({ threadId, input: textInputForUserMessage(options.prompt, promptInjection), cwd: options.cwd, approvalPolicy: options.approvalPolicy }, options.model), requestTimeoutMs).then((response) => ({ kind: "response" as const, response })),
terminalPromise.then(() => ({ kind: "terminal" as const })),
]);
waitingFor = "turn/start";
const turnStartStartedAt = Date.now();
emitCodexOtelSpan("codex_stdio.turn_start.start", options, env, { waitingFor, threadId: threadId ?? null }, { startTimeMs: turnStartStartedAt });
let turnStart: { kind: "response"; response: unknown } | { kind: "terminal" };
try {
turnStart = await Promise.race([
client.request("turn/start", withOptionalModel({ threadId, input: textInputForUserMessage(options.prompt, promptInjection), cwd: options.cwd, approvalPolicy: options.approvalPolicy }, options.model), requestTimeoutMs).then((response) => ({ kind: "response" as const, response })),
terminalPromise.then(() => ({ kind: "terminal" as const })),
]);
} catch (error) {
emitCodexOtelSpan("codex_stdio.turn_start.failed", options, env, { waitingFor, threadId: threadId ?? null, failureKind: normalizeFailure(error).failureKind }, { startTimeMs: turnStartStartedAt, endTimeMs: Date.now(), status: "error", error });
throw error;
}
if (turnStart.kind === "response") {
const turnResponse = requireResponseRecord(turnStart.response, "turn/start");
turnId = requireNestedId(turnResponse, "turn/start", "turn");
emitEvent({ type: "backend_status", payload: { phase: "turn/start:completed", turnId } });
waitingFor = "turn/completed";
emitCodexOtelSpan("codex_stdio.turn_start.completed", options, env, { waitingFor, threadId: threadId ?? null, turnId }, { startTimeMs: turnStartStartedAt, endTimeMs: Date.now() });
exposeActiveTurn("turn-start-response");
} else {
emitEvent({ type: "backend_status", payload: { phase: "turn/start:interrupted-before-response", threadId: threadId ?? null, turnId: turnId ?? null } });
const terminalSnapshot = terminal as unknown as { status: TerminalStatus; failureKind: FailureKind | null; message: string | null } | null;
if (terminalSnapshot?.status === "completed") {
emitCodexOtelSpan("codex_stdio.turn_start.completed", options, env, { waitingFor: "terminal-before-response", threadId: threadId ?? null, turnId: turnId ?? null, terminalStatus: terminalSnapshot.status, responseSource: "terminal-before-response" }, { startTimeMs: turnStartStartedAt, endTimeMs: Date.now() });
} else {
emitCodexOtelSpan("codex_stdio.turn_start.failed", options, env, { waitingFor, threadId: threadId ?? null, turnId: turnId ?? null, terminalStatus: terminalSnapshot?.status ?? null, failureKind: terminalSnapshot?.failureKind ?? null }, { startTimeMs: turnStartStartedAt, endTimeMs: Date.now(), status: "error", error: terminalSnapshot?.message ?? "turn/start interrupted before response" });
}
}
if (!terminal) {
@@ -594,7 +693,13 @@ async function runCodexStdioTurnWithSession(options: CodexStdioTurnOptions, sess
emitEvent({ type: "error", payload: { failureKind: terminal.failureKind, message: terminal.message, phase: "transport:closed-before-terminal", appServerExit: closeEvent(race.closeInfo) } });
}
}
if (!terminal) terminal = { status: "failed", failureKind: "backend-response-invalid", message: "codex app-server did not emit turn/completed" };
if (!terminal) {
if (lastToolCall && !missingTerminalAfterToolReported) {
missingTerminalAfterToolReported = true;
emitCodexOtelSpan("codex_stdio.missing_terminal_after_tool", options, env, { waitingFor, idleMs: Math.max(0, Date.now() - lastActivityAt), lastNotificationMethod, threadId: threadId ?? null, turnId: turnId ?? null, lastToolCall });
}
terminal = { status: "failed", failureKind: "backend-response-invalid", message: "codex app-server did not emit turn/completed" };
}
} catch (error) {
if (!terminal) {
const failure = normalizeFailure(error);
@@ -1216,6 +1321,163 @@ function backendMetadata(options: CodexStdioTurnOptions): JsonRecord {
};
}
function emitCodexOtelSpan(name: string, options: CodexStdioTurnOptions, env: NodeJS.ProcessEnv, attributes: JsonRecord = {}, span: { startTimeMs?: number; endTimeMs?: number; status?: "ok" | "error"; error?: unknown } = {}): void {
const context = options.otelContext;
if (!context) return;
void emitAgentRunOtelSpan(name, context.run, env, {
command: context.command,
scopeName: "agentrun.runner",
...(span.startTimeMs !== undefined ? { startTimeMs: span.startTimeMs } : {}),
...(span.endTimeMs !== undefined ? { endTimeMs: span.endTimeMs } : {}),
...(span.status !== undefined ? { status: span.status } : {}),
...(span.error !== undefined ? { error: span.error } : {}),
attributes: codexOtelAttributes(options, env, attributes),
});
}
function emitCodexOtelSpanFromLifecycle(name: string, context: CodexLifecycleOtelContext | null, span: { startTimeMs?: number; endTimeMs?: number; status?: "ok" | "error"; error?: unknown; attributes?: JsonRecord } = {}): void {
if (!context) return;
void emitAgentRunOtelSpan(name, context.run, context.env, {
command: context.command,
scopeName: "agentrun.runner",
...(span.startTimeMs !== undefined ? { startTimeMs: span.startTimeMs } : {}),
...(span.endTimeMs !== undefined ? { endTimeMs: span.endTimeMs } : {}),
...(span.status !== undefined ? { status: span.status } : {}),
...(span.error !== undefined ? { error: span.error } : {}),
attributes: { ...context.attributes, ...(span.attributes ?? {}), valuesPrinted: false },
});
}
function codexLifecycleOtelContext(options: CodexStdioTurnOptions, env: NodeJS.ProcessEnv): CodexLifecycleOtelContext | null {
if (!options.otelContext) return null;
return {
env,
run: options.otelContext.run,
command: options.otelContext.command,
attributes: codexOtelAttributes(options, env),
};
}
function codexOtelAttributes(options: CodexStdioTurnOptions, env: NodeJS.ProcessEnv, extra: JsonRecord = {}): JsonRecord {
const context = options.otelContext;
const profile = options.backendProfile ?? context?.run.backendProfile ?? "codex";
return {
runId: context?.run.id ?? null,
commandId: context?.command.id ?? null,
runnerJobId: context?.runnerJobId ?? null,
runnerId: context?.runnerId ?? null,
attemptId: context?.attemptId ?? null,
sessionId: context?.run.sessionRef?.sessionId ?? null,
threadId: context?.run.sessionRef?.threadId ?? options.threadId ?? null,
backendProfile: profile,
codexHome: pathSummary(resolveCodexHome(options)),
appServerSessionId: shortHash(codexClientKey(options, env)),
jobName: context?.jobName ?? null,
podName: context?.podName ?? null,
sourceCommit: context?.sourceCommit ?? null,
logPath: context?.logPath ? pathSummary(context.logPath) : null,
...extra,
valuesPrinted: false,
};
}
function closeEventAttributes(closeInfo: CodexStdioCloseInfo): JsonRecord {
return {
exitCode: closeInfo.code,
signal: closeInfo.signal,
stderrBytes: closeInfo.stderrBytes,
stderrTruncated: closeInfo.stderrTruncated,
valuesPrinted: false,
};
}
function codexIdleWarningMs(env: NodeJS.ProcessEnv, turnTimeoutMs: number): number {
const configured = Number(env.AGENTRUN_CODEX_STDIO_IDLE_WARNING_MS ?? env.AGENTRUN_CODEX_IDLE_WARNING_MS);
if (Number.isFinite(configured) && configured > 0) return Math.max(250, Math.floor(configured));
if (turnTimeoutMs > defaultIdleWarningMs) return defaultIdleWarningMs;
return Math.max(250, Math.floor(turnTimeoutMs / 2));
}
function emitCodexNotificationOtel(options: CodexStdioTurnOptions, env: NodeJS.ProcessEnv, message: JsonRecord, state: JsonRecord): void {
const attributes = { ...state, ...notificationOtelAttributes(message) };
emitCodexOtelSpan("codex_stdio.notification", options, env, attributes);
const method = String(attributes.method ?? "unknown");
if (method === "item/agentMessage/delta") emitCodexOtelSpan("codex_stdio.assistant_delta", options, env, attributes);
const tool = toolCallSummaryFromNotification(message);
if (tool) {
const status = tool.status === "failed" ? "failed" : tool.status === "started" ? "started" : "completed";
emitCodexOtelSpan(`codex_stdio.tool_call.${status}`, options, env, { ...state, ...tool }, { status: status === "failed" ? "error" : "ok", error: status === "failed" ? "tool call failed" : undefined });
}
if (method === "turn/completed") {
const failureKind = typeof attributes.failureKind === "string" ? attributes.failureKind : null;
emitCodexOtelSpan("codex_stdio.turn_completed", options, env, attributes, { status: failureKind ? "error" : "ok", error: failureKind ?? undefined });
if (failureKind === "provider-stream-disconnected") emitCodexOtelSpan("codex_stdio.provider_stream_disconnected", options, env, attributes, { status: "error", error: failureKind });
}
if (method === "error") {
const failureKind = typeof attributes.failureKind === "string" ? attributes.failureKind : null;
if (failureKind === "provider-stream-disconnected") emitCodexOtelSpan("codex_stdio.provider_stream_disconnected", options, env, attributes, { status: "error", error: failureKind });
}
}
function notificationOtelAttributes(message: JsonRecord): JsonRecord {
const method = typeof message.method === "string" ? message.method : "unknown";
const params = asRecordAt(message, "params");
const item = asRecordAt(params, "item");
const turn = asRecordAt(params, "turn");
const error = asRecordAt(params, "error");
const itemType = typeof item.type === "string" ? item.type : null;
const turnStatus = typeof turn.status === "string" ? turn.status : null;
const failureKind = method === "error"
? classifyCodexErrorRecord(error, "backend-failed")
: method === "turn/completed" && turnStatus !== "completed"
? classifyCodexErrorRecord(Object.keys(error).length > 0 ? error : { message: turnStatus ?? "unknown" }, "backend-failed")
: null;
return {
method,
itemId: stringAt(item, "id") ?? stringAt(params, "itemId"),
itemType,
itemStatus: typeof item.status === "string" ? item.status : null,
turnStatus,
turnId: stringAt(turn, "id"),
failureKind,
willRetry: typeof params.willRetry === "boolean" ? params.willRetry : null,
deltaChars: typeof params.delta === "string" ? params.delta.length : null,
valuesPrinted: false,
};
}
function toolCallSummaryFromNotification(message: JsonRecord): JsonRecord | null {
const method = typeof message.method === "string" ? message.method : "";
if (method !== "item/started" && method !== "item/completed") return null;
const item = asRecordAt(asRecordAt(message, "params"), "item");
const itemType = typeof item.type === "string" ? item.type : "unknown";
if (!isVisibleCodexToolItemType(itemType)) return null;
const command = toolCallCommandSummary(item, itemType, toolCallName(item, itemType));
return {
method,
itemId: stringAt(item, "id"),
itemType,
toolName: toolCallName(item, itemType),
status: toolCallStatus(method, item),
exitCode: typeof item.exitCode === "number" ? item.exitCode : null,
durationMs: typeof item.durationMs === "number" ? item.durationMs : null,
commandFingerprint: command ? shortHash(command) : null,
valuesPrinted: false,
};
}
function waitingForAfterNotification(message: JsonRecord, terminal: boolean): string {
if (terminal) return "terminal";
const method = typeof message.method === "string" ? message.method : "unknown";
const tool = toolCallSummaryFromNotification(message);
if (tool?.status === "started") return "tool-call";
if (tool?.status === "completed" || tool?.status === "failed") return "turn/completed";
if (method === "item/agentMessage/delta") return "assistant-delta";
if (method === "turn/started" || method === "turn/start") return "turn/completed";
if (method === "error") return asRecordAt(message, "params").willRetry === true ? "provider-retry" : "terminal";
return "codex-notification";
}
function envSummary(env: NodeJS.ProcessEnv): JsonRecord {
const keyState: Record<string, JsonValue> = {};
for (const key of childEnvSummaryKeys) keyState[key] = { present: typeof env[key] === "string" && String(env[key]).length > 0 };
+9 -3
View File
@@ -29,7 +29,7 @@ export function agentRunOtelTraceContext(run: RunRecord | null | undefined, comm
return { businessTraceId, traceId, parentSpanId, traceparent: `00-${traceId}-${parentSpanId}-01`, valuesPrinted: false };
}
export async function emitAgentRunOtelSpan(name: string, run: RunRecord | null | undefined, env: NodeJS.ProcessEnv = process.env, options: { command?: CommandRecord | null; startTimeMs?: number; endTimeMs?: number; status?: "ok" | "error"; error?: unknown; attributes?: JsonRecord; kind?: number } = {}): Promise<JsonRecord> {
export async function emitAgentRunOtelSpan(name: string, run: RunRecord | null | undefined, env: NodeJS.ProcessEnv = process.env, options: { command?: CommandRecord | null; startTimeMs?: number; endTimeMs?: number; status?: "ok" | "error"; error?: unknown; attributes?: JsonRecord; kind?: number; scopeName?: string } = {}): Promise<JsonRecord> {
const endpoint = resolveOtlpTracesEndpoint(env);
if (!endpoint || typeof fetch !== "function") return { ok: false, skipped: true, reason: "otlp-endpoint-missing", valuesPrinted: false };
const context = agentRunOtelTraceContext(run, options.command ?? null);
@@ -37,11 +37,12 @@ export async function emitAgentRunOtelSpan(name: string, run: RunRecord | null |
const endedAtMs = Number.isFinite(Number(options.endTimeMs)) ? Number(options.endTimeMs) : Date.now();
const spanId = nonZeroHex(randomBytes(8).toString("hex"), ZERO_SPAN_ID);
const statusCode = options.status === "error" || options.error ? 2 : 1;
const resource = resourceAttributes(env, run);
const body = {
resourceSpans: [{
resource: { attributes: attributesFromRecord(resourceAttributes(env, run)) },
resource: { attributes: attributesFromRecord(resource) },
scopeSpans: [{
scope: { name: "agentrun.manager", version: "1" },
scope: { name: options.scopeName ?? scopeNameFromResource(resource), version: "1" },
spans: [{
traceId: context.traceId,
spanId,
@@ -58,6 +59,7 @@ export async function emitAgentRunOtelSpan(name: string, run: RunRecord | null |
commandId: options.command?.id ?? null,
sessionId: run?.sessionRef?.sessionId ?? null,
...options.attributes,
valuesPrinted: false,
}),
status: {
code: statusCode,
@@ -102,6 +104,10 @@ function resourceAttributes(env: NodeJS.ProcessEnv, run: RunRecord | null | unde
};
}
function scopeNameFromResource(resource: JsonRecord): string {
return resource["service.name"] === "agentrun-runner" ? "agentrun.runner" : "agentrun.manager";
}
function attributesFromRecord(record: JsonRecord): Array<{ key: string; value: JsonRecord }> {
return Object.entries(record)
.filter(([, value]) => value !== undefined && value !== null)
+4
View File
@@ -198,6 +198,7 @@ export async function createKubernetesRunnerJob(options: { store: AgentRunStore;
mutation: true,
runId: run.id,
commandId,
runnerJobId: render.runnerJobId,
attemptId: render.attemptId,
runnerId: render.runnerId,
namespace: render.namespace,
@@ -213,6 +214,7 @@ export async function createKubernetesRunnerJob(options: { store: AgentRunStore;
runner: {
runId: run.id,
commandId,
runnerJobId: render.runnerJobId,
attemptId: render.attemptId,
runnerId: render.runnerId,
backendProfile: run.backendProfile,
@@ -250,6 +252,7 @@ export async function createKubernetesRunnerJob(options: { store: AgentRunStore;
},
} satisfies JsonRecord;
const saved = await options.store.saveRunnerJob({
id: render.runnerJobId,
runId: run.id,
commandId,
idempotencyKey: idempotencyKey ?? null,
@@ -267,6 +270,7 @@ export async function createKubernetesRunnerJob(options: { store: AgentRunStore;
await options.store.appendEvent(run.id, "backend_status", {
phase: "runner-job-created",
commandId,
runnerJobId: saved.id,
attemptId: saved.attemptId,
runnerId: saved.runnerId,
namespace: saved.namespace,
+1 -1
View File
@@ -571,7 +571,7 @@ CREATE TABLE IF NOT EXISTS agentrun_schema_migrations (
}
}
const at = nowIso();
const record: RunnerJobRecord = { ...input, id: newId("rjob"), idempotencyKey: input.idempotencyKey ?? null, serviceAccountName: input.serviceAccountName ?? null, createdAt: at, updatedAt: at };
const record: RunnerJobRecord = { ...input, id: input.id ?? newId("rjob"), idempotencyKey: input.idempotencyKey ?? null, serviceAccountName: input.serviceAccountName ?? null, createdAt: at, updatedAt: at };
const inserted = await client.query(
`INSERT INTO agentrun_runner_jobs (id, run_id, command_id, idempotency_key, payload_hash, attempt_id, runner_id, namespace, job_name, manager_url, image, source_commit, service_account_name, result, created_at, updated_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14::jsonb, $15, $16)
+2 -1
View File
@@ -89,6 +89,7 @@ export interface UpdateQueueTaskAttemptInput {
}
export interface SaveRunnerJobInput {
id?: string;
runId: string;
commandId: string;
idempotencyKey?: string | null;
@@ -213,7 +214,7 @@ export class MemoryAgentRunStore implements AgentRunStore {
if (existing) return existing;
}
const at = nowIso();
const record: RunnerJobRecord = { ...input, id: newId("rjob"), idempotencyKey: input.idempotencyKey ?? null, serviceAccountName: input.serviceAccountName ?? null, createdAt: at, updatedAt: at };
const record: RunnerJobRecord = { ...input, id: input.id ?? newId("rjob"), idempotencyKey: input.idempotencyKey ?? null, serviceAccountName: input.serviceAccountName ?? null, createdAt: at, updatedAt: at };
this.runnerJobs.set(record.id, record);
return record;
}
+42 -5
View File
@@ -1,4 +1,4 @@
import { stableHash } from "../common/validation.js";
import { newId, stableHash } from "../common/validation.js";
import type { BackendProfile, ExecutionPolicy, JsonRecord, JsonValue, RunRecord, SecretRef } from "../common/types.js";
import { backendProfileSpec } from "../common/backend-profiles.js";
import { staticWorkReadyCapabilitySummary } from "../common/work-ready.js";
@@ -42,6 +42,7 @@ const defaultRunnerNoProxyItems = [
export interface RunnerJobRenderOptions {
run: RunRecord;
commandId: string;
runnerJobId?: string;
managerUrl: string;
image: string;
bootRepoUrl?: string;
@@ -148,10 +149,11 @@ export function renderRunnerJobDryRun(options: RunnerJobRenderOptions): JsonReco
};
}
export function renderRunnerJobManifest(options: RunnerJobRenderOptions): { manifest: JsonRecord; namespace: string; jobName: string; runnerId: string; attemptId: string; sourceCommit: string; serviceAccountName: string; secretRefs: CredentialProjection[]; toolCredentials: ToolCredentialProjection[]; warnings: string[]; ttlSecondsAfterFinished: number; runnerIdleTimeoutMs: number } {
export function renderRunnerJobManifest(options: RunnerJobRenderOptions): { manifest: JsonRecord; namespace: string; jobName: string; runnerJobId: string; runnerId: string; attemptId: string; sourceCommit: string; serviceAccountName: string; secretRefs: CredentialProjection[]; toolCredentials: ToolCredentialProjection[]; warnings: string[]; ttlSecondsAfterFinished: number; runnerIdleTimeoutMs: number } {
const namespace = options.namespace ?? "agentrun-v01";
const attemptId = options.attemptId ?? `attempt_${Date.now().toString(36)}`;
const runnerId = options.runnerId ?? `runner_${shortHash(`${options.run.id}:${attemptId}:${options.commandId}`)}`;
const runnerJobId = options.runnerJobId ?? newId("rjob");
const sourceCommit = options.sourceCommit ?? process.env.AGENTRUN_SOURCE_COMMIT ?? "unknown";
const serviceAccountName = options.serviceAccountName ?? "agentrun-v01-runner";
const jobNamePrefix = normalizeJobNamePrefix(options.jobNamePrefix ?? serviceAccountName);
@@ -164,7 +166,7 @@ export function renderRunnerJobManifest(options: RunnerJobRenderOptions): { mani
const sessionPvc = options.sessionPvc;
const warnings: string[] = [];
if (secretRefs.length === 0) warnings.push("run executionPolicy.secretScope 未声明 provider SecretRefrunner 将按 secret-unavailable 上报,而不会降级直连外部凭据");
const env = runnerEnv(options, { namespace, jobName, runnerId, attemptId, sourceCommit, secretRefs, toolCredentials, sessionPvc, runnerIdleTimeoutMs });
const env = runnerEnv(options, { namespace, jobName, runnerJobId, runnerId, attemptId, sourceCommit, secretRefs, toolCredentials, sessionPvc, runnerIdleTimeoutMs });
const manifest: JsonRecord = {
apiVersion: "batch/v1",
kind: "Job",
@@ -227,10 +229,10 @@ export function renderRunnerJobManifest(options: RunnerJobRenderOptions): { mani
},
},
};
return { manifest, namespace, jobName, runnerId, attemptId, sourceCommit, serviceAccountName, secretRefs, toolCredentials, warnings, ttlSecondsAfterFinished, runnerIdleTimeoutMs };
return { manifest, namespace, jobName, runnerJobId, runnerId, attemptId, sourceCommit, serviceAccountName, secretRefs, toolCredentials, warnings, ttlSecondsAfterFinished, runnerIdleTimeoutMs };
}
function runnerEnv(options: RunnerJobRenderOptions, context: { namespace: string; jobName: string; runnerId: string; attemptId: string; sourceCommit: string; secretRefs: CredentialProjection[]; toolCredentials: ToolCredentialProjection[]; sessionPvc: RunnerSessionPvcOptions | undefined; runnerIdleTimeoutMs: number }): JsonRecord[] {
function runnerEnv(options: RunnerJobRenderOptions, context: { namespace: string; jobName: string; runnerJobId: string; runnerId: string; attemptId: string; sourceCommit: string; secretRefs: CredentialProjection[]; toolCredentials: ToolCredentialProjection[]; sessionPvc: RunnerSessionPvcOptions | undefined; runnerIdleTimeoutMs: number }): JsonRecord[] {
const selectedSecret = context.secretRefs.find((item) => item.profile === options.run.backendProfile);
const codexHome = selectedSecret?.runtimeMountPath ?? defaultRuntimeHome(options.run.backendProfile);
const bootRepoUrl = optionalString(options.bootRepoUrl) ?? defaultBootRepoUrl;
@@ -239,6 +241,7 @@ function runnerEnv(options: RunnerJobRenderOptions, context: { namespace: string
{ name: "AGENTRUN_API_KEY", valueFrom: { secretKeyRef: { name: "agentrun-v01-api-key", key: "HWLAB_API_KEY" } } },
{ name: "AGENTRUN_RUN_ID", value: options.run.id },
{ name: "AGENTRUN_COMMAND_ID", value: options.commandId },
{ name: "AGENTRUN_RUNNER_JOB_ID", value: context.runnerJobId },
{ name: "AGENTRUN_ATTEMPT_ID", value: context.attemptId },
{ name: "AGENTRUN_RUNNER_ID", value: context.runnerId },
{ name: "AGENTRUN_BACKEND_PROFILE", value: options.run.backendProfile },
@@ -262,6 +265,7 @@ function runnerEnv(options: RunnerJobRenderOptions, context: { namespace: string
{ name: "AGENTRUN_RUNNER_POLL_INTERVAL_MS", value: "250" },
{ name: "HOME", value: "/home/agentrun" },
{ name: "CODEX_HOME", value: codexHome },
...runnerOtelEnvVars(process.env),
...(selectedSecret ? [{ name: "AGENTRUN_CODEX_SECRET_HOME", value: selectedSecret.projectionMountPath }] : []),
...(context.sessionPvc ? [
{ name: "AGENTRUN_SESSION_PVC_NAME", value: context.sessionPvc.pvcName },
@@ -359,6 +363,39 @@ function runnerEgressProxyEnvVars(): JsonRecord[] {
];
}
function runnerOtelEnvVars(env: NodeJS.ProcessEnv): JsonRecord[] {
const tracesEndpoint = firstNonEmpty(env.AGENTRUN_OTEL_EXPORTER_OTLP_TRACES_ENDPOINT, env.OTEL_EXPORTER_OTLP_TRACES_ENDPOINT);
const baseEndpoint = firstNonEmpty(env.AGENTRUN_OTEL_EXPORTER_OTLP_ENDPOINT, env.OTEL_EXPORTER_OTLP_ENDPOINT);
return [
...(tracesEndpoint ? [
{ name: "OTEL_EXPORTER_OTLP_TRACES_ENDPOINT", value: tracesEndpoint },
{ name: "AGENTRUN_OTEL_EXPORTER_OTLP_TRACES_ENDPOINT", value: tracesEndpoint },
] : []),
...(!tracesEndpoint && baseEndpoint ? [
{ name: "OTEL_EXPORTER_OTLP_ENDPOINT", value: baseEndpoint },
{ name: "AGENTRUN_OTEL_EXPORTER_OTLP_ENDPOINT", value: baseEndpoint },
] : []),
{ name: "OTEL_SERVICE_NAME", value: "agentrun-runner" },
...optionalEnvVar("AGENTRUN_LANE", env.AGENTRUN_LANE),
...optionalEnvVar("UNIDESK_NODE_ID", env.UNIDESK_NODE_ID ?? env.AGENTRUN_NODE_ID),
...optionalEnvVar("AGENTRUN_NODE_ID", env.AGENTRUN_NODE_ID ?? env.UNIDESK_NODE_ID),
...optionalEnvVar("HWLAB_RUNTIME_LANE", env.HWLAB_RUNTIME_LANE),
];
}
function optionalEnvVar(name: string, value: string | undefined): JsonRecord[] {
const normalized = value?.trim();
return normalized ? [{ name, value: normalized }] : [];
}
function firstNonEmpty(...values: Array<string | null | undefined>): string | null {
for (const value of values) {
const text = value?.trim();
if (text) return text;
}
return null;
}
function runnerEgressProxyUrl(env: NodeJS.ProcessEnv): string {
const value = env.AGENTRUN_RUNNER_EGRESS_PROXY_URL?.trim();
return value && value.length > 0 ? value : fallbackRunnerEgressProxyUrl;
+1
View File
@@ -15,6 +15,7 @@ const options: RunnerOnceOptions = {
runId,
};
if (process.env.AGENTRUN_COMMAND_ID) options.commandId = process.env.AGENTRUN_COMMAND_ID;
if (process.env.AGENTRUN_RUNNER_JOB_ID) options.runnerJobId = process.env.AGENTRUN_RUNNER_JOB_ID;
if (process.env.AGENTRUN_ATTEMPT_ID) options.attemptId = process.env.AGENTRUN_ATTEMPT_ID;
if (process.env.AGENTRUN_RUNNER_ID) options.runnerId = process.env.AGENTRUN_RUNNER_ID;
if (process.env.AGENTRUN_BACKEND_PROFILE) {
+74 -2
View File
@@ -1,3 +1,5 @@
import { appendFile, mkdir, writeFile } from "node:fs/promises";
import path from "node:path";
import { RunnerManagerApi, failureKindFromError, terminalStatusForFailure, errorMessage } from "./manager-api.js";
import { createBackendSession, runBackendTurn, type BackendActiveTurnControl, type BackendAdapterOptions, type BackendSession } from "../backend/adapter.js";
import { materializeResourceBundle } from "./resource-bundle.js";
@@ -9,6 +11,7 @@ export interface RunnerOnceOptions extends BackendAdapterOptions {
managerUrl: string;
runId: string;
commandId?: string;
runnerJobId?: string;
runnerId?: string;
attemptId?: string;
leaseMs?: number;
@@ -38,7 +41,13 @@ interface RunnerFailure {
details?: JsonRecord | null;
}
interface RunnerLogSink {
write(label: string, payload: JsonRecord): Promise<void>;
}
export async function runOnce(options: RunnerOnceOptions): Promise<JsonRecord> {
const runnerLog = await createRunnerLogSink(options.logPath);
await runnerLog.write("runner.starting", { runId: options.runId, commandId: options.commandId ?? null, runnerJobId: options.runnerJobId ?? null, valuesPrinted: false });
const api = new RunnerManagerApi(options.managerUrl);
const targetRun = await api.getRun(options.runId);
if (isTerminalRun(targetRun)) return { terminalStatus: targetRun.terminalStatus, failureKind: targetRun.failureKind, run: targetRun, skipped: "run-terminal" } as JsonRecord;
@@ -54,10 +63,12 @@ export async function runOnce(options: RunnerOnceOptions): Promise<JsonRecord> {
placement: options.placement ?? "host-process",
sourceCommit: options.sourceCommit ?? process.env.AGENTRUN_SOURCE_COMMIT ?? "unknown",
...(options.runnerId ? { runnerId: options.runnerId } : {}),
...(options.runnerJobId ? { runnerJobId: options.runnerJobId } : {}),
...(options.jobName ? { jobName: options.jobName } : {}),
...(options.podName ? { podName: options.podName } : {}),
...(options.logPath ? { logPath: options.logPath } : {}),
}) as RunnerRecord;
await runnerLog.write("runner.registered", { runId: options.runId, runnerId: runner.id, runnerJobId: options.runnerJobId ?? null, attemptId, valuesPrinted: false });
try {
const imageWorkReady = await smokeImageWorkReadyCapabilities(options.env ?? process.env);
await api.appendEvent(options.runId, { type: "backend_status", payload: { phase: "runner-image-work-ready-smoke", attemptId, runnerId: runner.id, ...imageWorkReady } });
@@ -133,7 +144,7 @@ export async function runOnce(options: RunnerOnceOptions): Promise<JsonRecord> {
const result = materializationFailure
? await reportCommandFailure(api, options.runId, command.id, runner, attemptId, materializationFailure, "runner:resource-bundle", { terminalRun: true })
: await executeCommand(api, withResourceAssembly(options, resourceEnv, initialPrompt), command, runner, attemptId, workspacePath, backendSession ?? (backendSession = createBackendSession(currentRun, withResourceAssembly(options, resourceEnv, initialPrompt))));
: await executeCommand(api, withResourceAssembly(options, resourceEnv, initialPrompt), command, runner, attemptId, workspacePath, backendSession ?? (backendSession = createBackendSession(currentRun, withResourceAssembly(options, resourceEnv, initialPrompt))), runnerLog);
commandResults.push(result);
idleSince = Date.now();
if (options.oneShot === true) {
@@ -213,12 +224,13 @@ function pathDelimiter(): string {
return process.platform === "win32" ? ";" : ":";
}
async function executeCommand(api: RunnerManagerApi, options: RunnerOnceOptions, command: CommandRecord, runner: RunnerRecord, attemptId: string, workspacePath: string | undefined, backendSession: BackendSession | null): Promise<CommandExecutionResult> {
async function executeCommand(api: RunnerManagerApi, options: RunnerOnceOptions, command: CommandRecord, runner: RunnerRecord, attemptId: string, workspacePath: string | undefined, backendSession: BackendSession | null, runnerLog: RunnerLogSink): Promise<CommandExecutionResult> {
await api.ackCommand(command.id);
const acked = await api.getCommand(options.runId, command.id);
if (acked.state === "cancelled") return await reportCancelled(api, options.runId, command.id, runner, attemptId, "command cancelled before backend start");
await assertNotCancelled(api, options.runId, command.id);
await api.appendEvent(options.runId, { type: "backend_status", payload: { phase: "backend-turn-started", commandId: command.id, attemptId, runnerId: runner.id, backendProfile: options.backendProfile ?? null, workspaceReady: Boolean(workspacePath) } });
await runnerLog.write("command.started", { runId: options.runId, commandId: command.id, runnerId: runner.id, runnerJobId: options.runnerJobId ?? null, attemptId, backendProfile: options.backendProfile ?? null, valuesPrinted: false });
const abortController = new AbortController();
const stopCancelWatch = watchCancellation(api, options.runId, command.id, abortController);
const backendProgress = startBackendProgress();
@@ -229,7 +241,19 @@ async function executeCommand(api: RunnerManagerApi, options: RunnerOnceOptions,
...options,
...(workspacePath ? { workspacePath } : {}),
abortSignal: abortController.signal,
otelContext: {
run: latestRun,
command,
attemptId,
runnerId: runner.id,
runnerJobId: options.runnerJobId ?? null,
jobName: options.jobName ?? null,
podName: options.podName ?? null,
sourceCommit: options.sourceCommit ?? null,
logPath: options.logPath ?? null,
},
onEvent: async (event: BackendEvent) => {
await runnerLog.write("backend.event", runnerLogEventSummary(event, options.runId, command.id, attemptId, runner.id, options.runnerJobId ?? null));
await appendBestEffort(api, options.runId, annotateCommandEvent(event, command.id, attemptId, runner.id));
},
onActiveTurn: (control: BackendActiveTurnControl) => {
@@ -245,6 +269,7 @@ async function executeCommand(api: RunnerManagerApi, options: RunnerOnceOptions,
await api.appendEvent(options.runId, annotateCommandEvent(event, command.id, attemptId, runner.id));
}
await api.reportCommandStatus(command.id, { terminalStatus: result.terminalStatus, failureKind: result.failureKind, failureMessage: result.failureMessage, ...(result.threadId ? { threadId: result.threadId } : {}), ...(result.turnId ? { turnId: result.turnId } : {}) });
await runnerLog.write("command.terminal", { runId: options.runId, commandId: command.id, runnerId: runner.id, runnerJobId: options.runnerJobId ?? null, attemptId, terminalStatus: result.terminalStatus, failureKind: result.failureKind, valuesPrinted: false });
return { commandId: command.id, terminalStatus: result.terminalStatus, failureKind: result.failureKind } as CommandExecutionResult;
} catch (error) {
const failureKind = failureKindFromError(error);
@@ -537,3 +562,50 @@ function normalizePollIntervalMs(value: number | undefined): number {
if (!Number.isFinite(value ?? NaN)) return 250;
return Math.max(50, Math.min(2_000, Math.floor(value!)));
}
async function createRunnerLogSink(logPath: string | undefined): Promise<RunnerLogSink> {
const normalized = logPath?.trim();
if (!normalized) return { write: async () => undefined };
try {
await mkdir(path.dirname(normalized), { recursive: true });
await writeFile(normalized, "", { flag: "a" });
} catch {
return { write: async () => undefined };
}
return {
write: async (label: string, payload: JsonRecord) => {
try {
const line = JSON.stringify({ at: new Date().toISOString(), label, ...payload, valuesPrinted: false }) + "\n";
await appendFile(normalized, line, "utf8");
} catch {
// Local runner log is diagnostic-only; manager events and terminal state remain authoritative.
}
},
};
}
function runnerLogEventSummary(event: BackendEvent, runId: string, commandId: string, attemptId: string, runnerId: string, runnerJobId: string | null): JsonRecord {
const payload = event.payload ?? {};
return {
runId,
commandId,
runnerId,
runnerJobId,
attemptId,
eventType: event.type,
phase: stringPayload(payload, "phase"),
terminalStatus: stringPayload(payload, "terminalStatus"),
failureKind: stringPayload(payload, "failureKind"),
threadId: stringPayload(payload, "threadId"),
turnId: stringPayload(payload, "turnId"),
itemId: stringPayload(payload, "itemId"),
itemType: stringPayload(payload, "type"),
method: stringPayload(payload, "method"),
valuesPrinted: false,
};
}
function stringPayload(record: JsonRecord, key: string): string | null {
const value = record[key];
return typeof value === "string" && value.trim().length > 0 ? value.trim() : null;
}