From e53e7c2c3cc1603e13661c2b673c3e1f4bc8fe54 Mon Sep 17 00:00:00 2001 From: lyon Date: Sat, 20 Jun 2026 15:32:46 +0800 Subject: [PATCH] fix: trace codex stdio lifecycle --- src/backend/adapter.ts | 14 +- src/backend/codex-stdio.ts | 292 +++++++++++++++++++++++++++++-- src/common/otel-trace.ts | 12 +- src/mgr/kubernetes-runner-job.ts | 4 + src/mgr/postgres-store.ts | 2 +- src/mgr/store.ts | 3 +- src/runner/k8s-job.ts | 47 ++++- src/runner/main.ts | 1 + src/runner/run-once.ts | 76 +++++++- 9 files changed, 423 insertions(+), 28 deletions(-) diff --git a/src/backend/adapter.ts b/src/backend/adapter.ts index d7e82a5..a5d93f7 100644 --- a/src/backend/adapter.ts +++ b/src/backend/adapter.ts @@ -1,4 +1,4 @@ -import type { BackendEvent, BackendTurnResult, CommandRecord, InitialPromptAssembly, RunRecord } from "../common/types.js"; +import type { BackendEvent, BackendTurnResult, CommandRecord, InitialPromptAssembly, JsonRecord, RunRecord } from "../common/types.js"; import { CodexStdioBackendSession, runCodexStdioTurn, type CodexStdioTurnOptions } from "./codex-stdio.js"; import { backendProfileSpec } from "../common/backend-profiles.js"; @@ -16,6 +16,17 @@ export interface BackendAdapterOptions { workspacePath?: string; abortSignal?: AbortSignal; initialPrompt?: InitialPromptAssembly; + otelContext?: { + run: RunRecord; + command: CommandRecord; + attemptId?: string | null; + runnerId?: string | null; + runnerJobId?: string | null; + jobName?: string | null; + podName?: string | null; + sourceCommit?: string | null; + logPath?: string | null; + } & JsonRecord; onEvent?: (event: BackendEvent) => void | Promise; onActiveTurn?: (control: BackendActiveTurnControl) => void | (() => void); env?: NodeJS.ProcessEnv; @@ -64,6 +75,7 @@ export function backendTurnOptions(run: RunRecord, command: CommandRecord, optio if (options.codexArgs) turnOptions.args = options.codexArgs; if (options.env) turnOptions.env = options.env; if (options.initialPrompt) turnOptions.initialPrompt = options.initialPrompt; + if (options.otelContext) turnOptions.otelContext = options.otelContext; if (options.codexHome) turnOptions.codexHome = options.codexHome; if (options.abortSignal) turnOptions.abortSignal = options.abortSignal; if (options.onEvent) turnOptions.onEvent = options.onEvent; diff --git a/src/backend/codex-stdio.ts b/src/backend/codex-stdio.ts index ef28bbc..11e2a21 100644 --- a/src/backend/codex-stdio.ts +++ b/src/backend/codex-stdio.ts @@ -4,10 +4,11 @@ import { accessSync, constants as fsConstants, readdirSync, readFileSync } from import { chmod, copyFile, mkdir } from "node:fs/promises"; import path from "node:path"; import * as readline from "node:readline"; -import type { BackendEvent, BackendProfile, BackendTurnResult, FailureKind, InitialPromptAssembly, JsonRecord, JsonValue, TerminalStatus } from "../common/types.js"; +import type { BackendEvent, BackendProfile, BackendTurnResult, CommandRecord, FailureKind, InitialPromptAssembly, JsonRecord, JsonValue, RunRecord, TerminalStatus } from "../common/types.js"; import { redactJson, redactText } from "../common/redaction.js"; import { backendProfileSpec } from "../common/backend-profiles.js"; import { boundedTextSummary, commandOutputPayload } from "../common/output.js"; +import { emitAgentRunOtelSpan } from "../common/otel-trace.js"; const codexProtocol = "codex-app-server-jsonrpc-stdio"; const defaultCodexArgs = ["app-server", "--listen", "stdio://"]; @@ -16,6 +17,7 @@ const stderrEventChars = 4_000; const requestTimeoutCapMs = 30_000; const assistantDeltaProgressMinChars = 500; const assistantDeltaProgressLimitChars = 1_200; +const defaultIdleWarningMs = 8_000; const childEnvSummaryKeys = [ "CODEX_HOME", @@ -53,11 +55,31 @@ export interface CodexStdioTurnOptions { env?: NodeJS.ProcessEnv; codexHome?: string; initialPrompt?: InitialPromptAssembly; + otelContext?: CodexStdioOtelContext; abortSignal?: AbortSignal; onEvent?: (event: BackendEvent) => void | Promise; onActiveTurn?: (control: CodexActiveTurnControl) => void | (() => void); } +export interface CodexStdioOtelContext extends JsonRecord { + run: RunRecord; + command: CommandRecord; + attemptId?: string | null; + runnerId?: string | null; + runnerJobId?: string | null; + jobName?: string | null; + podName?: string | null; + sourceCommit?: string | null; + logPath?: string | null; +} + +interface CodexLifecycleOtelContext { + env: NodeJS.ProcessEnv; + run: RunRecord; + command: CommandRecord; + attributes: JsonRecord; +} + export interface CodexActiveTurnControl { threadId: string; turnId: string; @@ -331,6 +353,7 @@ export async function runCodexStdioTurn(options: CodexStdioTurnOptions): Promise export class CodexStdioBackendSession { private client: CodexStdioClient | null = null; private clientKey: string | null = null; + private lifecycleOtel: CodexLifecycleOtelContext | null = null; async runTurn(options: CodexStdioTurnOptions): Promise { return await runCodexStdioTurnWithSession(options, this); @@ -343,13 +366,17 @@ export class CodexStdioBackendSession { this.clientKey = null; client.stop(); const closeInfo = await client.closedPromise; + emitCodexOtelSpanFromLifecycle("codex_app_server.exit", this.lifecycleOtel, { status: closeInfo.failureKind ? "error" : "ok", error: closeInfo.message ?? closeInfo.failureKind ?? undefined, attributes: { ...closeEventAttributes(closeInfo), failureKind: closeInfo.failureKind } }); + this.lifecycleOtel = null; return [{ type: "backend_status", payload: { phase: "codex-app-server-closed", appServerExit: closeEvent(closeInfo) } }]; } async getClient(options: CodexStdioTurnOptions, env: NodeJS.ProcessEnv, emitEvent: (event: BackendEvent) => void): Promise { const key = codexClientKey(options, env); + this.lifecycleOtel = codexLifecycleOtelContext(options, env); if (this.client && !this.client.isClosed && this.clientKey === key) { emitEvent({ type: "backend_status", payload: { phase: "codex-app-server:reused", ...backendMetadata(options), protocol: codexProtocol } }); + emitCodexOtelSpan("codex_app_server.reused", options, env, { command: options.command ?? "codex", argsFingerprint: shortHash(JSON.stringify(options.args ?? defaultCodexArgs)) }); return this.client; } const closeEvents = await this.close(); @@ -364,6 +391,8 @@ export class CodexStdioBackendSession { config: codexConfigSummary(resolveCodexHome(options), options.backendProfile ?? "codex"), }, }); + const appServerStartMs = Date.now(); + emitCodexOtelSpan("codex_app_server.starting", options, env, { command: options.command ?? "codex", argsFingerprint: shortHash(JSON.stringify(options.args ?? defaultCodexArgs)), cwd: pathSummary(options.cwd), codexHome: pathSummary(resolveCodexHome(options)) }, { startTimeMs: appServerStartMs }); const clientOptions: ConstructorParameters[0] = { cwd: options.cwd, env, @@ -371,14 +400,20 @@ export class CodexStdioBackendSession { }; if (options.command) clientOptions.command = options.command; if (options.args) clientOptions.args = options.args; - this.client = new CodexStdioClient(clientOptions); - this.clientKey = key; - const requestTimeoutMs = Math.min(positiveTimeout(options.timeoutMs), requestTimeoutCapMs); - const initializeResult = requireResponseRecord(await this.client.request("initialize", { clientInfo: { name: "agentrun", title: "AgentRun", version: "0.1.0" }, capabilities: { experimentalApi: true } }, requestTimeoutMs), "initialize"); - validateInitializeResponse(initializeResult); - this.client.notify("initialized", {}); - emitEvent({ type: "backend_status", payload: { phase: "initialize:completed", ...backendMetadata(options), protocol: codexProtocol } }); - return this.client; + try { + this.client = new CodexStdioClient(clientOptions); + this.clientKey = key; + const requestTimeoutMs = Math.min(positiveTimeout(options.timeoutMs), requestTimeoutCapMs); + const initializeResult = requireResponseRecord(await this.client.request("initialize", { clientInfo: { name: "agentrun", title: "AgentRun", version: "0.1.0" }, capabilities: { experimentalApi: true } }, requestTimeoutMs), "initialize"); + validateInitializeResponse(initializeResult); + this.client.notify("initialized", {}); + emitEvent({ type: "backend_status", payload: { phase: "initialize:completed", ...backendMetadata(options), protocol: codexProtocol } }); + emitCodexOtelSpan("codex_app_server.started", options, env, { command: options.command ?? "codex", argsFingerprint: shortHash(JSON.stringify(options.args ?? defaultCodexArgs)) }, { startTimeMs: appServerStartMs, endTimeMs: Date.now() }); + return this.client; + } catch (error) { + emitCodexOtelSpan("codex_app_server.exit", options, env, { phase: "app-server-start", command: options.command ?? "codex", failureKind: normalizeFailure(error).failureKind }, { startTimeMs: appServerStartMs, endTimeMs: Date.now(), status: "error", error }); + throw error; + } } private notificationHandlers = new Set<(message: JsonRecord) => void>(); @@ -423,6 +458,11 @@ async function runCodexStdioTurnWithSession(options: CodexStdioTurnOptions, sess const assistantDeltaProgress = createAssistantDeltaProgressState(); const completedAssistantMessages: CompletedAssistantMessage[] = []; const suppressedNotifications = createSuppressedNotificationSummary(); + let waitingFor = "codex-app-server"; + let lastNotificationMethod: string | null = null; + let lastActivityAt = Date.now(); + let lastToolCall: JsonRecord | null = null; + let missingTerminalAfterToolReported = false; let threadId: string | undefined = options.threadId; let turnId: string | undefined; let terminal: { status: TerminalStatus; failureKind: FailureKind | null; message: string | null } | null = null; @@ -491,29 +531,56 @@ async function runCodexStdioTurnWithSession(options: CodexStdioTurnOptions, sess }; options.abortSignal?.addEventListener("abort", abortTurn, { once: true }); const turnIdleTimeoutMs = positiveTimeout(options.timeoutMs); + const idleWarningMs = codexIdleWarningMs(env, turnIdleTimeoutMs); let idleTimeout: NodeJS.Timeout | null = null; + let idleWarningTimeout: NodeJS.Timeout | null = null; + const scheduleIdleWarning = (): void => { + if (idleWarningTimeout) clearTimeout(idleWarningTimeout); + idleWarningTimeout = setTimeout(() => { + if (terminal) return; + const idleMs = Math.max(0, Date.now() - lastActivityAt); + const attrs = { waitingFor, idleMs, lastNotificationMethod, threadId: threadId ?? null, turnId: turnId ?? null, terminalStatus: null }; + emitCodexOtelSpan("codex_stdio.idle_warning", options, env, attrs); + if (lastToolCall && !missingTerminalAfterToolReported) { + missingTerminalAfterToolReported = true; + emitCodexOtelSpan("codex_stdio.missing_terminal_after_tool", options, env, { ...attrs, lastToolCall }); + } + }, idleWarningMs); + idleWarningTimeout.unref?.(); + }; const refreshTurnActivity = (): void => { if (terminal) return; + lastActivityAt = Date.now(); + scheduleIdleWarning(); if (idleTimeout) clearTimeout(idleTimeout); idleTimeout = setTimeout(() => { if (terminal) return; terminal = { status: "failed", failureKind: "backend-timeout", message: `codex stdio turn idle timed out after ${turnIdleTimeoutMs}ms without activity` }; emitEvent({ type: "error", payload: { failureKind: terminal.failureKind, message: terminal.message, phase: "turn:idle-timeout" } }); + emitCodexOtelSpan("codex_stdio.idle_timeout", options, env, { waitingFor, idleMs: Math.max(0, Date.now() - lastActivityAt), lastNotificationMethod, threadId: threadId ?? null, turnId: turnId ?? null, terminalStatus: terminal.status, failureKind: terminal.failureKind }, { status: "error", error: terminal.message }); beginInterruptAndStop("idle timeout", "turn:idle-timeout"); terminalResolve(); }, turnIdleTimeoutMs); + idleTimeout.unref?.(); }; const stopTurnIdleTimeout = (): void => { if (!idleTimeout) return; clearTimeout(idleTimeout); idleTimeout = null; + if (idleWarningTimeout) clearTimeout(idleWarningTimeout); + idleWarningTimeout = null; }; refreshTurnActivity(); const stopNotifications = session.addNotificationHandler((message) => { refreshTurnActivity(); + lastNotificationMethod = typeof message.method === "string" ? message.method : "unknown"; + emitCodexNotificationOtel(options, env, message, { threadId: threadId ?? null, turnId: turnId ?? null, waitingFor }); const normalized = normalizeCodexNotification(message, suppressedNotifications); if (normalized.threadId) threadId = normalized.threadId; if (normalized.turnId) turnId = normalized.turnId; + waitingFor = waitingForAfterNotification(message, normalized.terminal !== undefined); + const toolSummary = toolCallSummaryFromNotification(message); + if (toolSummary?.status === "completed" || toolSummary?.status === "failed") lastToolCall = toolSummary; exposeActiveTurn(normalized.turnId ? "turn-notification" : "notification"); emitEvents(normalized.events); if (normalized.assistantDelta) { @@ -534,9 +601,19 @@ async function runCodexStdioTurnWithSession(options: CodexStdioTurnOptions, sess client = await session.getClient(options, env, emitEvent); const startThread = async (phasePrefix = "thread/start"): Promise => { - const response = requireResponseRecord(await client!.request("thread/start", withOptionalModel({ cwd: options.cwd, approvalPolicy: options.approvalPolicy, sandbox: options.sandbox, serviceName: "agentrun" }, options.model), requestTimeoutMs), "thread/start"); + waitingFor = "thread/start"; + const startedAt = Date.now(); + emitCodexOtelSpan("codex_stdio.thread_start.start", options, env, { waitingFor, requestedThreadId: null }, { startTimeMs: startedAt }); + let response: JsonRecord; + try { + response = requireResponseRecord(await client!.request("thread/start", withOptionalModel({ cwd: options.cwd, approvalPolicy: options.approvalPolicy, sandbox: options.sandbox, serviceName: "agentrun" }, options.model), requestTimeoutMs), "thread/start"); + } catch (error) { + emitCodexOtelSpan("codex_stdio.thread_start.failed", options, env, { waitingFor, failureKind: normalizeFailure(error).failureKind }, { startTimeMs: startedAt, endTimeMs: Date.now(), status: "error", error }); + throw error; + } const nextThreadId = requireNestedId(response, "thread/start", "thread"); emitEvent({ type: "backend_status", payload: { phase: `${phasePrefix}:completed`, threadId: nextThreadId } }); + emitCodexOtelSpan("codex_stdio.thread_start.completed", options, env, { waitingFor, threadId: nextThreadId }, { startTimeMs: startedAt, endTimeMs: Date.now() }); return nextThreadId; }; @@ -549,12 +626,17 @@ async function runCodexStdioTurnWithSession(options: CodexStdioTurnOptions, sess emitEvent({ type: "backend_status", payload: { phase: "codex-rollout-storage-mounted", pvcName: sessionPvcName, pvcNamespace: sessionPvcNamespace, mountPath: sessionPvcMountPath, codexRolloutSubdir: codexRolloutSubdirEnv ?? "sessions", valuesPrinted: false } }); } if (options.threadId) { + waitingFor = "thread/resume"; + const startedAt = Date.now(); + emitCodexOtelSpan("codex_stdio.thread_resume.start", options, env, { waitingFor, requestedThreadId: options.threadId }, { startTimeMs: startedAt }); try { const threadResponse = requireResponseRecord(await client.request("thread/resume", withOptionalModel({ threadId: options.threadId, cwd: options.cwd, approvalPolicy: options.approvalPolicy, sandbox: options.sandbox }, options.model), requestTimeoutMs), "thread/resume"); threadId = requireNestedId(threadResponse, "thread/resume", "thread"); emitEvent({ type: "backend_status", payload: { phase: "thread/resume:completed", threadId } }); + emitCodexOtelSpan("codex_stdio.thread_resume.completed", options, env, { waitingFor, requestedThreadId: options.threadId, threadId }, { startTimeMs: startedAt, endTimeMs: Date.now() }); } catch (error) { const failure = normalizeFailure(error); + emitCodexOtelSpan("codex_stdio.thread_resume.failed", options, env, { waitingFor, requestedThreadId: options.threadId, failureKind: failure.failureKind }, { startTimeMs: startedAt, endTimeMs: Date.now(), status: "error", error }); if (sessionPvcName && isNoRolloutFoundMessage(failure.message)) { throw new CodexStdioFailure( "session-store-evicted", @@ -571,17 +653,34 @@ async function runCodexStdioTurnWithSession(options: CodexStdioTurnOptions, sess const promptInjection = initialPromptInjection(options.initialPrompt, willResumeThread); emitEvent({ type: "backend_status", payload: { phase: "initial-prompt-assembly", initialPromptInjected: promptInjection.injected, reason: promptInjection.reason, initialPrompt: options.initialPrompt?.summary ?? { available: false, valuesPrinted: false }, valuesPrinted: false } }); - const turnStart = await Promise.race([ - client.request("turn/start", withOptionalModel({ threadId, input: textInputForUserMessage(options.prompt, promptInjection), cwd: options.cwd, approvalPolicy: options.approvalPolicy }, options.model), requestTimeoutMs).then((response) => ({ kind: "response" as const, response })), - terminalPromise.then(() => ({ kind: "terminal" as const })), - ]); + waitingFor = "turn/start"; + const turnStartStartedAt = Date.now(); + emitCodexOtelSpan("codex_stdio.turn_start.start", options, env, { waitingFor, threadId: threadId ?? null }, { startTimeMs: turnStartStartedAt }); + let turnStart: { kind: "response"; response: unknown } | { kind: "terminal" }; + try { + turnStart = await Promise.race([ + client.request("turn/start", withOptionalModel({ threadId, input: textInputForUserMessage(options.prompt, promptInjection), cwd: options.cwd, approvalPolicy: options.approvalPolicy }, options.model), requestTimeoutMs).then((response) => ({ kind: "response" as const, response })), + terminalPromise.then(() => ({ kind: "terminal" as const })), + ]); + } catch (error) { + emitCodexOtelSpan("codex_stdio.turn_start.failed", options, env, { waitingFor, threadId: threadId ?? null, failureKind: normalizeFailure(error).failureKind }, { startTimeMs: turnStartStartedAt, endTimeMs: Date.now(), status: "error", error }); + throw error; + } if (turnStart.kind === "response") { const turnResponse = requireResponseRecord(turnStart.response, "turn/start"); turnId = requireNestedId(turnResponse, "turn/start", "turn"); emitEvent({ type: "backend_status", payload: { phase: "turn/start:completed", turnId } }); + waitingFor = "turn/completed"; + emitCodexOtelSpan("codex_stdio.turn_start.completed", options, env, { waitingFor, threadId: threadId ?? null, turnId }, { startTimeMs: turnStartStartedAt, endTimeMs: Date.now() }); exposeActiveTurn("turn-start-response"); } else { emitEvent({ type: "backend_status", payload: { phase: "turn/start:interrupted-before-response", threadId: threadId ?? null, turnId: turnId ?? null } }); + const terminalSnapshot = terminal as unknown as { status: TerminalStatus; failureKind: FailureKind | null; message: string | null } | null; + if (terminalSnapshot?.status === "completed") { + emitCodexOtelSpan("codex_stdio.turn_start.completed", options, env, { waitingFor: "terminal-before-response", threadId: threadId ?? null, turnId: turnId ?? null, terminalStatus: terminalSnapshot.status, responseSource: "terminal-before-response" }, { startTimeMs: turnStartStartedAt, endTimeMs: Date.now() }); + } else { + emitCodexOtelSpan("codex_stdio.turn_start.failed", options, env, { waitingFor, threadId: threadId ?? null, turnId: turnId ?? null, terminalStatus: terminalSnapshot?.status ?? null, failureKind: terminalSnapshot?.failureKind ?? null }, { startTimeMs: turnStartStartedAt, endTimeMs: Date.now(), status: "error", error: terminalSnapshot?.message ?? "turn/start interrupted before response" }); + } } if (!terminal) { @@ -594,7 +693,13 @@ async function runCodexStdioTurnWithSession(options: CodexStdioTurnOptions, sess emitEvent({ type: "error", payload: { failureKind: terminal.failureKind, message: terminal.message, phase: "transport:closed-before-terminal", appServerExit: closeEvent(race.closeInfo) } }); } } - if (!terminal) terminal = { status: "failed", failureKind: "backend-response-invalid", message: "codex app-server did not emit turn/completed" }; + if (!terminal) { + if (lastToolCall && !missingTerminalAfterToolReported) { + missingTerminalAfterToolReported = true; + emitCodexOtelSpan("codex_stdio.missing_terminal_after_tool", options, env, { waitingFor, idleMs: Math.max(0, Date.now() - lastActivityAt), lastNotificationMethod, threadId: threadId ?? null, turnId: turnId ?? null, lastToolCall }); + } + terminal = { status: "failed", failureKind: "backend-response-invalid", message: "codex app-server did not emit turn/completed" }; + } } catch (error) { if (!terminal) { const failure = normalizeFailure(error); @@ -1216,6 +1321,163 @@ function backendMetadata(options: CodexStdioTurnOptions): JsonRecord { }; } +function emitCodexOtelSpan(name: string, options: CodexStdioTurnOptions, env: NodeJS.ProcessEnv, attributes: JsonRecord = {}, span: { startTimeMs?: number; endTimeMs?: number; status?: "ok" | "error"; error?: unknown } = {}): void { + const context = options.otelContext; + if (!context) return; + void emitAgentRunOtelSpan(name, context.run, env, { + command: context.command, + scopeName: "agentrun.runner", + ...(span.startTimeMs !== undefined ? { startTimeMs: span.startTimeMs } : {}), + ...(span.endTimeMs !== undefined ? { endTimeMs: span.endTimeMs } : {}), + ...(span.status !== undefined ? { status: span.status } : {}), + ...(span.error !== undefined ? { error: span.error } : {}), + attributes: codexOtelAttributes(options, env, attributes), + }); +} + +function emitCodexOtelSpanFromLifecycle(name: string, context: CodexLifecycleOtelContext | null, span: { startTimeMs?: number; endTimeMs?: number; status?: "ok" | "error"; error?: unknown; attributes?: JsonRecord } = {}): void { + if (!context) return; + void emitAgentRunOtelSpan(name, context.run, context.env, { + command: context.command, + scopeName: "agentrun.runner", + ...(span.startTimeMs !== undefined ? { startTimeMs: span.startTimeMs } : {}), + ...(span.endTimeMs !== undefined ? { endTimeMs: span.endTimeMs } : {}), + ...(span.status !== undefined ? { status: span.status } : {}), + ...(span.error !== undefined ? { error: span.error } : {}), + attributes: { ...context.attributes, ...(span.attributes ?? {}), valuesPrinted: false }, + }); +} + +function codexLifecycleOtelContext(options: CodexStdioTurnOptions, env: NodeJS.ProcessEnv): CodexLifecycleOtelContext | null { + if (!options.otelContext) return null; + return { + env, + run: options.otelContext.run, + command: options.otelContext.command, + attributes: codexOtelAttributes(options, env), + }; +} + +function codexOtelAttributes(options: CodexStdioTurnOptions, env: NodeJS.ProcessEnv, extra: JsonRecord = {}): JsonRecord { + const context = options.otelContext; + const profile = options.backendProfile ?? context?.run.backendProfile ?? "codex"; + return { + runId: context?.run.id ?? null, + commandId: context?.command.id ?? null, + runnerJobId: context?.runnerJobId ?? null, + runnerId: context?.runnerId ?? null, + attemptId: context?.attemptId ?? null, + sessionId: context?.run.sessionRef?.sessionId ?? null, + threadId: context?.run.sessionRef?.threadId ?? options.threadId ?? null, + backendProfile: profile, + codexHome: pathSummary(resolveCodexHome(options)), + appServerSessionId: shortHash(codexClientKey(options, env)), + jobName: context?.jobName ?? null, + podName: context?.podName ?? null, + sourceCommit: context?.sourceCommit ?? null, + logPath: context?.logPath ? pathSummary(context.logPath) : null, + ...extra, + valuesPrinted: false, + }; +} + +function closeEventAttributes(closeInfo: CodexStdioCloseInfo): JsonRecord { + return { + exitCode: closeInfo.code, + signal: closeInfo.signal, + stderrBytes: closeInfo.stderrBytes, + stderrTruncated: closeInfo.stderrTruncated, + valuesPrinted: false, + }; +} + +function codexIdleWarningMs(env: NodeJS.ProcessEnv, turnTimeoutMs: number): number { + const configured = Number(env.AGENTRUN_CODEX_STDIO_IDLE_WARNING_MS ?? env.AGENTRUN_CODEX_IDLE_WARNING_MS); + if (Number.isFinite(configured) && configured > 0) return Math.max(250, Math.floor(configured)); + if (turnTimeoutMs > defaultIdleWarningMs) return defaultIdleWarningMs; + return Math.max(250, Math.floor(turnTimeoutMs / 2)); +} + +function emitCodexNotificationOtel(options: CodexStdioTurnOptions, env: NodeJS.ProcessEnv, message: JsonRecord, state: JsonRecord): void { + const attributes = { ...state, ...notificationOtelAttributes(message) }; + emitCodexOtelSpan("codex_stdio.notification", options, env, attributes); + const method = String(attributes.method ?? "unknown"); + if (method === "item/agentMessage/delta") emitCodexOtelSpan("codex_stdio.assistant_delta", options, env, attributes); + const tool = toolCallSummaryFromNotification(message); + if (tool) { + const status = tool.status === "failed" ? "failed" : tool.status === "started" ? "started" : "completed"; + emitCodexOtelSpan(`codex_stdio.tool_call.${status}`, options, env, { ...state, ...tool }, { status: status === "failed" ? "error" : "ok", error: status === "failed" ? "tool call failed" : undefined }); + } + if (method === "turn/completed") { + const failureKind = typeof attributes.failureKind === "string" ? attributes.failureKind : null; + emitCodexOtelSpan("codex_stdio.turn_completed", options, env, attributes, { status: failureKind ? "error" : "ok", error: failureKind ?? undefined }); + if (failureKind === "provider-stream-disconnected") emitCodexOtelSpan("codex_stdio.provider_stream_disconnected", options, env, attributes, { status: "error", error: failureKind }); + } + if (method === "error") { + const failureKind = typeof attributes.failureKind === "string" ? attributes.failureKind : null; + if (failureKind === "provider-stream-disconnected") emitCodexOtelSpan("codex_stdio.provider_stream_disconnected", options, env, attributes, { status: "error", error: failureKind }); + } +} + +function notificationOtelAttributes(message: JsonRecord): JsonRecord { + const method = typeof message.method === "string" ? message.method : "unknown"; + const params = asRecordAt(message, "params"); + const item = asRecordAt(params, "item"); + const turn = asRecordAt(params, "turn"); + const error = asRecordAt(params, "error"); + const itemType = typeof item.type === "string" ? item.type : null; + const turnStatus = typeof turn.status === "string" ? turn.status : null; + const failureKind = method === "error" + ? classifyCodexErrorRecord(error, "backend-failed") + : method === "turn/completed" && turnStatus !== "completed" + ? classifyCodexErrorRecord(Object.keys(error).length > 0 ? error : { message: turnStatus ?? "unknown" }, "backend-failed") + : null; + return { + method, + itemId: stringAt(item, "id") ?? stringAt(params, "itemId"), + itemType, + itemStatus: typeof item.status === "string" ? item.status : null, + turnStatus, + turnId: stringAt(turn, "id"), + failureKind, + willRetry: typeof params.willRetry === "boolean" ? params.willRetry : null, + deltaChars: typeof params.delta === "string" ? params.delta.length : null, + valuesPrinted: false, + }; +} + +function toolCallSummaryFromNotification(message: JsonRecord): JsonRecord | null { + const method = typeof message.method === "string" ? message.method : ""; + if (method !== "item/started" && method !== "item/completed") return null; + const item = asRecordAt(asRecordAt(message, "params"), "item"); + const itemType = typeof item.type === "string" ? item.type : "unknown"; + if (!isVisibleCodexToolItemType(itemType)) return null; + const command = toolCallCommandSummary(item, itemType, toolCallName(item, itemType)); + return { + method, + itemId: stringAt(item, "id"), + itemType, + toolName: toolCallName(item, itemType), + status: toolCallStatus(method, item), + exitCode: typeof item.exitCode === "number" ? item.exitCode : null, + durationMs: typeof item.durationMs === "number" ? item.durationMs : null, + commandFingerprint: command ? shortHash(command) : null, + valuesPrinted: false, + }; +} + +function waitingForAfterNotification(message: JsonRecord, terminal: boolean): string { + if (terminal) return "terminal"; + const method = typeof message.method === "string" ? message.method : "unknown"; + const tool = toolCallSummaryFromNotification(message); + if (tool?.status === "started") return "tool-call"; + if (tool?.status === "completed" || tool?.status === "failed") return "turn/completed"; + if (method === "item/agentMessage/delta") return "assistant-delta"; + if (method === "turn/started" || method === "turn/start") return "turn/completed"; + if (method === "error") return asRecordAt(message, "params").willRetry === true ? "provider-retry" : "terminal"; + return "codex-notification"; +} + function envSummary(env: NodeJS.ProcessEnv): JsonRecord { const keyState: Record = {}; for (const key of childEnvSummaryKeys) keyState[key] = { present: typeof env[key] === "string" && String(env[key]).length > 0 }; diff --git a/src/common/otel-trace.ts b/src/common/otel-trace.ts index 4d3f5d8..f0fe9e6 100644 --- a/src/common/otel-trace.ts +++ b/src/common/otel-trace.ts @@ -29,7 +29,7 @@ export function agentRunOtelTraceContext(run: RunRecord | null | undefined, comm return { businessTraceId, traceId, parentSpanId, traceparent: `00-${traceId}-${parentSpanId}-01`, valuesPrinted: false }; } -export async function emitAgentRunOtelSpan(name: string, run: RunRecord | null | undefined, env: NodeJS.ProcessEnv = process.env, options: { command?: CommandRecord | null; startTimeMs?: number; endTimeMs?: number; status?: "ok" | "error"; error?: unknown; attributes?: JsonRecord; kind?: number } = {}): Promise { +export async function emitAgentRunOtelSpan(name: string, run: RunRecord | null | undefined, env: NodeJS.ProcessEnv = process.env, options: { command?: CommandRecord | null; startTimeMs?: number; endTimeMs?: number; status?: "ok" | "error"; error?: unknown; attributes?: JsonRecord; kind?: number; scopeName?: string } = {}): Promise { const endpoint = resolveOtlpTracesEndpoint(env); if (!endpoint || typeof fetch !== "function") return { ok: false, skipped: true, reason: "otlp-endpoint-missing", valuesPrinted: false }; const context = agentRunOtelTraceContext(run, options.command ?? null); @@ -37,11 +37,12 @@ export async function emitAgentRunOtelSpan(name: string, run: RunRecord | null | const endedAtMs = Number.isFinite(Number(options.endTimeMs)) ? Number(options.endTimeMs) : Date.now(); const spanId = nonZeroHex(randomBytes(8).toString("hex"), ZERO_SPAN_ID); const statusCode = options.status === "error" || options.error ? 2 : 1; + const resource = resourceAttributes(env, run); const body = { resourceSpans: [{ - resource: { attributes: attributesFromRecord(resourceAttributes(env, run)) }, + resource: { attributes: attributesFromRecord(resource) }, scopeSpans: [{ - scope: { name: "agentrun.manager", version: "1" }, + scope: { name: options.scopeName ?? scopeNameFromResource(resource), version: "1" }, spans: [{ traceId: context.traceId, spanId, @@ -58,6 +59,7 @@ export async function emitAgentRunOtelSpan(name: string, run: RunRecord | null | commandId: options.command?.id ?? null, sessionId: run?.sessionRef?.sessionId ?? null, ...options.attributes, + valuesPrinted: false, }), status: { code: statusCode, @@ -102,6 +104,10 @@ function resourceAttributes(env: NodeJS.ProcessEnv, run: RunRecord | null | unde }; } +function scopeNameFromResource(resource: JsonRecord): string { + return resource["service.name"] === "agentrun-runner" ? "agentrun.runner" : "agentrun.manager"; +} + function attributesFromRecord(record: JsonRecord): Array<{ key: string; value: JsonRecord }> { return Object.entries(record) .filter(([, value]) => value !== undefined && value !== null) diff --git a/src/mgr/kubernetes-runner-job.ts b/src/mgr/kubernetes-runner-job.ts index 50c549a..884fe43 100644 --- a/src/mgr/kubernetes-runner-job.ts +++ b/src/mgr/kubernetes-runner-job.ts @@ -198,6 +198,7 @@ export async function createKubernetesRunnerJob(options: { store: AgentRunStore; mutation: true, runId: run.id, commandId, + runnerJobId: render.runnerJobId, attemptId: render.attemptId, runnerId: render.runnerId, namespace: render.namespace, @@ -213,6 +214,7 @@ export async function createKubernetesRunnerJob(options: { store: AgentRunStore; runner: { runId: run.id, commandId, + runnerJobId: render.runnerJobId, attemptId: render.attemptId, runnerId: render.runnerId, backendProfile: run.backendProfile, @@ -250,6 +252,7 @@ export async function createKubernetesRunnerJob(options: { store: AgentRunStore; }, } satisfies JsonRecord; const saved = await options.store.saveRunnerJob({ + id: render.runnerJobId, runId: run.id, commandId, idempotencyKey: idempotencyKey ?? null, @@ -267,6 +270,7 @@ export async function createKubernetesRunnerJob(options: { store: AgentRunStore; await options.store.appendEvent(run.id, "backend_status", { phase: "runner-job-created", commandId, + runnerJobId: saved.id, attemptId: saved.attemptId, runnerId: saved.runnerId, namespace: saved.namespace, diff --git a/src/mgr/postgres-store.ts b/src/mgr/postgres-store.ts index ce21467..4a50d76 100644 --- a/src/mgr/postgres-store.ts +++ b/src/mgr/postgres-store.ts @@ -571,7 +571,7 @@ CREATE TABLE IF NOT EXISTS agentrun_schema_migrations ( } } const at = nowIso(); - const record: RunnerJobRecord = { ...input, id: newId("rjob"), idempotencyKey: input.idempotencyKey ?? null, serviceAccountName: input.serviceAccountName ?? null, createdAt: at, updatedAt: at }; + const record: RunnerJobRecord = { ...input, id: input.id ?? newId("rjob"), idempotencyKey: input.idempotencyKey ?? null, serviceAccountName: input.serviceAccountName ?? null, createdAt: at, updatedAt: at }; const inserted = await client.query( `INSERT INTO agentrun_runner_jobs (id, run_id, command_id, idempotency_key, payload_hash, attempt_id, runner_id, namespace, job_name, manager_url, image, source_commit, service_account_name, result, created_at, updated_at) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14::jsonb, $15, $16) diff --git a/src/mgr/store.ts b/src/mgr/store.ts index 4c1c86d..f0c3feb 100644 --- a/src/mgr/store.ts +++ b/src/mgr/store.ts @@ -89,6 +89,7 @@ export interface UpdateQueueTaskAttemptInput { } export interface SaveRunnerJobInput { + id?: string; runId: string; commandId: string; idempotencyKey?: string | null; @@ -213,7 +214,7 @@ export class MemoryAgentRunStore implements AgentRunStore { if (existing) return existing; } const at = nowIso(); - const record: RunnerJobRecord = { ...input, id: newId("rjob"), idempotencyKey: input.idempotencyKey ?? null, serviceAccountName: input.serviceAccountName ?? null, createdAt: at, updatedAt: at }; + const record: RunnerJobRecord = { ...input, id: input.id ?? newId("rjob"), idempotencyKey: input.idempotencyKey ?? null, serviceAccountName: input.serviceAccountName ?? null, createdAt: at, updatedAt: at }; this.runnerJobs.set(record.id, record); return record; } diff --git a/src/runner/k8s-job.ts b/src/runner/k8s-job.ts index d5d711f..79cd717 100644 --- a/src/runner/k8s-job.ts +++ b/src/runner/k8s-job.ts @@ -1,4 +1,4 @@ -import { stableHash } from "../common/validation.js"; +import { newId, stableHash } from "../common/validation.js"; import type { BackendProfile, ExecutionPolicy, JsonRecord, JsonValue, RunRecord, SecretRef } from "../common/types.js"; import { backendProfileSpec } from "../common/backend-profiles.js"; import { staticWorkReadyCapabilitySummary } from "../common/work-ready.js"; @@ -42,6 +42,7 @@ const defaultRunnerNoProxyItems = [ export interface RunnerJobRenderOptions { run: RunRecord; commandId: string; + runnerJobId?: string; managerUrl: string; image: string; bootRepoUrl?: string; @@ -148,10 +149,11 @@ export function renderRunnerJobDryRun(options: RunnerJobRenderOptions): JsonReco }; } -export function renderRunnerJobManifest(options: RunnerJobRenderOptions): { manifest: JsonRecord; namespace: string; jobName: string; runnerId: string; attemptId: string; sourceCommit: string; serviceAccountName: string; secretRefs: CredentialProjection[]; toolCredentials: ToolCredentialProjection[]; warnings: string[]; ttlSecondsAfterFinished: number; runnerIdleTimeoutMs: number } { +export function renderRunnerJobManifest(options: RunnerJobRenderOptions): { manifest: JsonRecord; namespace: string; jobName: string; runnerJobId: string; runnerId: string; attemptId: string; sourceCommit: string; serviceAccountName: string; secretRefs: CredentialProjection[]; toolCredentials: ToolCredentialProjection[]; warnings: string[]; ttlSecondsAfterFinished: number; runnerIdleTimeoutMs: number } { const namespace = options.namespace ?? "agentrun-v01"; const attemptId = options.attemptId ?? `attempt_${Date.now().toString(36)}`; const runnerId = options.runnerId ?? `runner_${shortHash(`${options.run.id}:${attemptId}:${options.commandId}`)}`; + const runnerJobId = options.runnerJobId ?? newId("rjob"); const sourceCommit = options.sourceCommit ?? process.env.AGENTRUN_SOURCE_COMMIT ?? "unknown"; const serviceAccountName = options.serviceAccountName ?? "agentrun-v01-runner"; const jobNamePrefix = normalizeJobNamePrefix(options.jobNamePrefix ?? serviceAccountName); @@ -164,7 +166,7 @@ export function renderRunnerJobManifest(options: RunnerJobRenderOptions): { mani const sessionPvc = options.sessionPvc; const warnings: string[] = []; if (secretRefs.length === 0) warnings.push("run executionPolicy.secretScope 未声明 provider SecretRef;runner 将按 secret-unavailable 上报,而不会降级直连外部凭据"); - const env = runnerEnv(options, { namespace, jobName, runnerId, attemptId, sourceCommit, secretRefs, toolCredentials, sessionPvc, runnerIdleTimeoutMs }); + const env = runnerEnv(options, { namespace, jobName, runnerJobId, runnerId, attemptId, sourceCommit, secretRefs, toolCredentials, sessionPvc, runnerIdleTimeoutMs }); const manifest: JsonRecord = { apiVersion: "batch/v1", kind: "Job", @@ -227,10 +229,10 @@ export function renderRunnerJobManifest(options: RunnerJobRenderOptions): { mani }, }, }; - return { manifest, namespace, jobName, runnerId, attemptId, sourceCommit, serviceAccountName, secretRefs, toolCredentials, warnings, ttlSecondsAfterFinished, runnerIdleTimeoutMs }; + return { manifest, namespace, jobName, runnerJobId, runnerId, attemptId, sourceCommit, serviceAccountName, secretRefs, toolCredentials, warnings, ttlSecondsAfterFinished, runnerIdleTimeoutMs }; } -function runnerEnv(options: RunnerJobRenderOptions, context: { namespace: string; jobName: string; runnerId: string; attemptId: string; sourceCommit: string; secretRefs: CredentialProjection[]; toolCredentials: ToolCredentialProjection[]; sessionPvc: RunnerSessionPvcOptions | undefined; runnerIdleTimeoutMs: number }): JsonRecord[] { +function runnerEnv(options: RunnerJobRenderOptions, context: { namespace: string; jobName: string; runnerJobId: string; runnerId: string; attemptId: string; sourceCommit: string; secretRefs: CredentialProjection[]; toolCredentials: ToolCredentialProjection[]; sessionPvc: RunnerSessionPvcOptions | undefined; runnerIdleTimeoutMs: number }): JsonRecord[] { const selectedSecret = context.secretRefs.find((item) => item.profile === options.run.backendProfile); const codexHome = selectedSecret?.runtimeMountPath ?? defaultRuntimeHome(options.run.backendProfile); const bootRepoUrl = optionalString(options.bootRepoUrl) ?? defaultBootRepoUrl; @@ -239,6 +241,7 @@ function runnerEnv(options: RunnerJobRenderOptions, context: { namespace: string { name: "AGENTRUN_API_KEY", valueFrom: { secretKeyRef: { name: "agentrun-v01-api-key", key: "HWLAB_API_KEY" } } }, { name: "AGENTRUN_RUN_ID", value: options.run.id }, { name: "AGENTRUN_COMMAND_ID", value: options.commandId }, + { name: "AGENTRUN_RUNNER_JOB_ID", value: context.runnerJobId }, { name: "AGENTRUN_ATTEMPT_ID", value: context.attemptId }, { name: "AGENTRUN_RUNNER_ID", value: context.runnerId }, { name: "AGENTRUN_BACKEND_PROFILE", value: options.run.backendProfile }, @@ -262,6 +265,7 @@ function runnerEnv(options: RunnerJobRenderOptions, context: { namespace: string { name: "AGENTRUN_RUNNER_POLL_INTERVAL_MS", value: "250" }, { name: "HOME", value: "/home/agentrun" }, { name: "CODEX_HOME", value: codexHome }, + ...runnerOtelEnvVars(process.env), ...(selectedSecret ? [{ name: "AGENTRUN_CODEX_SECRET_HOME", value: selectedSecret.projectionMountPath }] : []), ...(context.sessionPvc ? [ { name: "AGENTRUN_SESSION_PVC_NAME", value: context.sessionPvc.pvcName }, @@ -359,6 +363,39 @@ function runnerEgressProxyEnvVars(): JsonRecord[] { ]; } +function runnerOtelEnvVars(env: NodeJS.ProcessEnv): JsonRecord[] { + const tracesEndpoint = firstNonEmpty(env.AGENTRUN_OTEL_EXPORTER_OTLP_TRACES_ENDPOINT, env.OTEL_EXPORTER_OTLP_TRACES_ENDPOINT); + const baseEndpoint = firstNonEmpty(env.AGENTRUN_OTEL_EXPORTER_OTLP_ENDPOINT, env.OTEL_EXPORTER_OTLP_ENDPOINT); + return [ + ...(tracesEndpoint ? [ + { name: "OTEL_EXPORTER_OTLP_TRACES_ENDPOINT", value: tracesEndpoint }, + { name: "AGENTRUN_OTEL_EXPORTER_OTLP_TRACES_ENDPOINT", value: tracesEndpoint }, + ] : []), + ...(!tracesEndpoint && baseEndpoint ? [ + { name: "OTEL_EXPORTER_OTLP_ENDPOINT", value: baseEndpoint }, + { name: "AGENTRUN_OTEL_EXPORTER_OTLP_ENDPOINT", value: baseEndpoint }, + ] : []), + { name: "OTEL_SERVICE_NAME", value: "agentrun-runner" }, + ...optionalEnvVar("AGENTRUN_LANE", env.AGENTRUN_LANE), + ...optionalEnvVar("UNIDESK_NODE_ID", env.UNIDESK_NODE_ID ?? env.AGENTRUN_NODE_ID), + ...optionalEnvVar("AGENTRUN_NODE_ID", env.AGENTRUN_NODE_ID ?? env.UNIDESK_NODE_ID), + ...optionalEnvVar("HWLAB_RUNTIME_LANE", env.HWLAB_RUNTIME_LANE), + ]; +} + +function optionalEnvVar(name: string, value: string | undefined): JsonRecord[] { + const normalized = value?.trim(); + return normalized ? [{ name, value: normalized }] : []; +} + +function firstNonEmpty(...values: Array): string | null { + for (const value of values) { + const text = value?.trim(); + if (text) return text; + } + return null; +} + function runnerEgressProxyUrl(env: NodeJS.ProcessEnv): string { const value = env.AGENTRUN_RUNNER_EGRESS_PROXY_URL?.trim(); return value && value.length > 0 ? value : fallbackRunnerEgressProxyUrl; diff --git a/src/runner/main.ts b/src/runner/main.ts index 0a8a6f7..0796ffc 100644 --- a/src/runner/main.ts +++ b/src/runner/main.ts @@ -15,6 +15,7 @@ const options: RunnerOnceOptions = { runId, }; if (process.env.AGENTRUN_COMMAND_ID) options.commandId = process.env.AGENTRUN_COMMAND_ID; +if (process.env.AGENTRUN_RUNNER_JOB_ID) options.runnerJobId = process.env.AGENTRUN_RUNNER_JOB_ID; if (process.env.AGENTRUN_ATTEMPT_ID) options.attemptId = process.env.AGENTRUN_ATTEMPT_ID; if (process.env.AGENTRUN_RUNNER_ID) options.runnerId = process.env.AGENTRUN_RUNNER_ID; if (process.env.AGENTRUN_BACKEND_PROFILE) { diff --git a/src/runner/run-once.ts b/src/runner/run-once.ts index 57f6e03..ada8911 100644 --- a/src/runner/run-once.ts +++ b/src/runner/run-once.ts @@ -1,3 +1,5 @@ +import { appendFile, mkdir, writeFile } from "node:fs/promises"; +import path from "node:path"; import { RunnerManagerApi, failureKindFromError, terminalStatusForFailure, errorMessage } from "./manager-api.js"; import { createBackendSession, runBackendTurn, type BackendActiveTurnControl, type BackendAdapterOptions, type BackendSession } from "../backend/adapter.js"; import { materializeResourceBundle } from "./resource-bundle.js"; @@ -9,6 +11,7 @@ export interface RunnerOnceOptions extends BackendAdapterOptions { managerUrl: string; runId: string; commandId?: string; + runnerJobId?: string; runnerId?: string; attemptId?: string; leaseMs?: number; @@ -38,7 +41,13 @@ interface RunnerFailure { details?: JsonRecord | null; } +interface RunnerLogSink { + write(label: string, payload: JsonRecord): Promise; +} + export async function runOnce(options: RunnerOnceOptions): Promise { + const runnerLog = await createRunnerLogSink(options.logPath); + await runnerLog.write("runner.starting", { runId: options.runId, commandId: options.commandId ?? null, runnerJobId: options.runnerJobId ?? null, valuesPrinted: false }); const api = new RunnerManagerApi(options.managerUrl); const targetRun = await api.getRun(options.runId); if (isTerminalRun(targetRun)) return { terminalStatus: targetRun.terminalStatus, failureKind: targetRun.failureKind, run: targetRun, skipped: "run-terminal" } as JsonRecord; @@ -54,10 +63,12 @@ export async function runOnce(options: RunnerOnceOptions): Promise { placement: options.placement ?? "host-process", sourceCommit: options.sourceCommit ?? process.env.AGENTRUN_SOURCE_COMMIT ?? "unknown", ...(options.runnerId ? { runnerId: options.runnerId } : {}), + ...(options.runnerJobId ? { runnerJobId: options.runnerJobId } : {}), ...(options.jobName ? { jobName: options.jobName } : {}), ...(options.podName ? { podName: options.podName } : {}), ...(options.logPath ? { logPath: options.logPath } : {}), }) as RunnerRecord; + await runnerLog.write("runner.registered", { runId: options.runId, runnerId: runner.id, runnerJobId: options.runnerJobId ?? null, attemptId, valuesPrinted: false }); try { const imageWorkReady = await smokeImageWorkReadyCapabilities(options.env ?? process.env); await api.appendEvent(options.runId, { type: "backend_status", payload: { phase: "runner-image-work-ready-smoke", attemptId, runnerId: runner.id, ...imageWorkReady } }); @@ -133,7 +144,7 @@ export async function runOnce(options: RunnerOnceOptions): Promise { const result = materializationFailure ? await reportCommandFailure(api, options.runId, command.id, runner, attemptId, materializationFailure, "runner:resource-bundle", { terminalRun: true }) - : await executeCommand(api, withResourceAssembly(options, resourceEnv, initialPrompt), command, runner, attemptId, workspacePath, backendSession ?? (backendSession = createBackendSession(currentRun, withResourceAssembly(options, resourceEnv, initialPrompt)))); + : await executeCommand(api, withResourceAssembly(options, resourceEnv, initialPrompt), command, runner, attemptId, workspacePath, backendSession ?? (backendSession = createBackendSession(currentRun, withResourceAssembly(options, resourceEnv, initialPrompt))), runnerLog); commandResults.push(result); idleSince = Date.now(); if (options.oneShot === true) { @@ -213,12 +224,13 @@ function pathDelimiter(): string { return process.platform === "win32" ? ";" : ":"; } -async function executeCommand(api: RunnerManagerApi, options: RunnerOnceOptions, command: CommandRecord, runner: RunnerRecord, attemptId: string, workspacePath: string | undefined, backendSession: BackendSession | null): Promise { +async function executeCommand(api: RunnerManagerApi, options: RunnerOnceOptions, command: CommandRecord, runner: RunnerRecord, attemptId: string, workspacePath: string | undefined, backendSession: BackendSession | null, runnerLog: RunnerLogSink): Promise { await api.ackCommand(command.id); const acked = await api.getCommand(options.runId, command.id); if (acked.state === "cancelled") return await reportCancelled(api, options.runId, command.id, runner, attemptId, "command cancelled before backend start"); await assertNotCancelled(api, options.runId, command.id); await api.appendEvent(options.runId, { type: "backend_status", payload: { phase: "backend-turn-started", commandId: command.id, attemptId, runnerId: runner.id, backendProfile: options.backendProfile ?? null, workspaceReady: Boolean(workspacePath) } }); + await runnerLog.write("command.started", { runId: options.runId, commandId: command.id, runnerId: runner.id, runnerJobId: options.runnerJobId ?? null, attemptId, backendProfile: options.backendProfile ?? null, valuesPrinted: false }); const abortController = new AbortController(); const stopCancelWatch = watchCancellation(api, options.runId, command.id, abortController); const backendProgress = startBackendProgress(); @@ -229,7 +241,19 @@ async function executeCommand(api: RunnerManagerApi, options: RunnerOnceOptions, ...options, ...(workspacePath ? { workspacePath } : {}), abortSignal: abortController.signal, + otelContext: { + run: latestRun, + command, + attemptId, + runnerId: runner.id, + runnerJobId: options.runnerJobId ?? null, + jobName: options.jobName ?? null, + podName: options.podName ?? null, + sourceCommit: options.sourceCommit ?? null, + logPath: options.logPath ?? null, + }, onEvent: async (event: BackendEvent) => { + await runnerLog.write("backend.event", runnerLogEventSummary(event, options.runId, command.id, attemptId, runner.id, options.runnerJobId ?? null)); await appendBestEffort(api, options.runId, annotateCommandEvent(event, command.id, attemptId, runner.id)); }, onActiveTurn: (control: BackendActiveTurnControl) => { @@ -245,6 +269,7 @@ async function executeCommand(api: RunnerManagerApi, options: RunnerOnceOptions, await api.appendEvent(options.runId, annotateCommandEvent(event, command.id, attemptId, runner.id)); } await api.reportCommandStatus(command.id, { terminalStatus: result.terminalStatus, failureKind: result.failureKind, failureMessage: result.failureMessage, ...(result.threadId ? { threadId: result.threadId } : {}), ...(result.turnId ? { turnId: result.turnId } : {}) }); + await runnerLog.write("command.terminal", { runId: options.runId, commandId: command.id, runnerId: runner.id, runnerJobId: options.runnerJobId ?? null, attemptId, terminalStatus: result.terminalStatus, failureKind: result.failureKind, valuesPrinted: false }); return { commandId: command.id, terminalStatus: result.terminalStatus, failureKind: result.failureKind } as CommandExecutionResult; } catch (error) { const failureKind = failureKindFromError(error); @@ -537,3 +562,50 @@ function normalizePollIntervalMs(value: number | undefined): number { if (!Number.isFinite(value ?? NaN)) return 250; return Math.max(50, Math.min(2_000, Math.floor(value!))); } + +async function createRunnerLogSink(logPath: string | undefined): Promise { + const normalized = logPath?.trim(); + if (!normalized) return { write: async () => undefined }; + try { + await mkdir(path.dirname(normalized), { recursive: true }); + await writeFile(normalized, "", { flag: "a" }); + } catch { + return { write: async () => undefined }; + } + return { + write: async (label: string, payload: JsonRecord) => { + try { + const line = JSON.stringify({ at: new Date().toISOString(), label, ...payload, valuesPrinted: false }) + "\n"; + await appendFile(normalized, line, "utf8"); + } catch { + // Local runner log is diagnostic-only; manager events and terminal state remain authoritative. + } + }, + }; +} + +function runnerLogEventSummary(event: BackendEvent, runId: string, commandId: string, attemptId: string, runnerId: string, runnerJobId: string | null): JsonRecord { + const payload = event.payload ?? {}; + return { + runId, + commandId, + runnerId, + runnerJobId, + attemptId, + eventType: event.type, + phase: stringPayload(payload, "phase"), + terminalStatus: stringPayload(payload, "terminalStatus"), + failureKind: stringPayload(payload, "failureKind"), + threadId: stringPayload(payload, "threadId"), + turnId: stringPayload(payload, "turnId"), + itemId: stringPayload(payload, "itemId"), + itemType: stringPayload(payload, "type"), + method: stringPayload(payload, "method"), + valuesPrinted: false, + }; +} + +function stringPayload(record: JsonRecord, key: string): string | null { + const value = record[key]; + return typeof value === "string" && value.trim().length > 0 ? value.trim() : null; +}