From c0a5704c10b249748148660c17520fe5055fd85a Mon Sep 17 00:00:00 2001 From: Lyon <88232613+pikasTech@users.noreply.github.com> Date: Tue, 23 Jun 2026 11:44:49 +0800 Subject: [PATCH] fix: emit terminal otel spans for command status (#230) --- src/mgr/server.ts | 23 ++++++++++++++++------- 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/src/mgr/server.ts b/src/mgr/server.ts index 275cfe5..a2c4115 100644 --- a/src/mgr/server.ts +++ b/src/mgr/server.ts @@ -827,34 +827,43 @@ async function route({ method, url, body, store, sourceCommit, authSummary, runn store.getRun(runId), store.appendEvent(runId, type, payload), ]); - emitRunEventOtelSpan(type, payload, run, startedAt); + await emitRunEventOtelSpan(type, payload, run, startedAt); return event as unknown as JsonValue; } const statusMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/status$/u); if (method === "PATCH" && statusMatch) { + const startedAt = Date.now(); + const runId = statusMatch[1] ?? ""; const record = asRecord(body, "status"); const terminalStatus = record.terminalStatus === "completed" || record.terminalStatus === "failed" || record.terminalStatus === "blocked" || record.terminalStatus === "cancelled" ? record.terminalStatus : "failed"; - return await store.finishRun(statusMatch[1] ?? "", { + const run = await store.finishRun(runId, { terminalStatus, failureKind: typeof record.failureKind === "string" ? record.failureKind as never : null, failureMessage: typeof record.failureMessage === "string" ? record.failureMessage : null, ...(typeof record.threadId === "string" ? { threadId: record.threadId } : {}), ...(typeof record.turnId === "string" ? { turnId: record.turnId } : {}), - }) as unknown as JsonValue; + }); + await emitRunEventOtelSpan("terminal_status", { terminalStatus, failureKind: typeof record.failureKind === "string" ? record.failureKind : null, message: typeof record.failureMessage === "string" ? record.failureMessage : null, threadId: typeof record.threadId === "string" ? record.threadId : null, turnId: typeof record.turnId === "string" ? record.turnId : null }, run, startedAt); + return run as unknown as JsonValue; } const ackMatch = path.match(/^\/api\/v1\/commands\/([^/]+)\/ack$/u); if (method === "POST" && ackMatch) return await store.ackCommand(ackMatch[1] ?? "") as unknown as JsonValue; const commandStatusMatch = path.match(/^\/api\/v1\/commands\/([^/]+)\/status$/u); if (method === "PATCH" && commandStatusMatch) { + const startedAt = Date.now(); const record = asRecord(body, "commandStatus"); const terminalStatus = record.terminalStatus === "completed" || record.terminalStatus === "failed" || record.terminalStatus === "blocked" || record.terminalStatus === "cancelled" ? record.terminalStatus : "failed"; - return await store.finishCommand(commandStatusMatch[1] ?? "", { + const command = await store.finishCommand(commandStatusMatch[1] ?? "", { terminalStatus, failureKind: typeof record.failureKind === "string" ? record.failureKind as never : null, failureMessage: typeof record.failureMessage === "string" ? record.failureMessage : null, ...(typeof record.threadId === "string" ? { threadId: record.threadId } : {}), ...(typeof record.turnId === "string" ? { turnId: record.turnId } : {}), - }) as unknown as JsonValue; + }); + const run = await store.getRun(command.runId); + await emitRunEventOtelSpan("backend_status", { phase: "command-terminal", commandId: command.id, state: command.state, terminalStatus, failureKind: typeof record.failureKind === "string" ? record.failureKind : null, message: typeof record.failureMessage === "string" ? record.failureMessage : null, threadId: typeof record.threadId === "string" ? record.threadId : null, turnId: typeof record.turnId === "string" ? record.turnId : null }, run, startedAt); + if (command.type === "turn") await emitRunEventOtelSpan("terminal_status", { terminalStatus, failureKind: typeof record.failureKind === "string" ? record.failureKind : null, message: typeof record.failureMessage === "string" ? record.failureMessage : null, commandId: command.id, threadId: typeof record.threadId === "string" ? record.threadId : null, turnId: typeof record.turnId === "string" ? record.turnId : null }, run, startedAt); + return command as unknown as JsonValue; } const commandCancelMatch = path.match(/^\/api\/v1\/commands\/([^/]+)\/cancel$/u); if (method === "POST" && commandCancelMatch) { @@ -1072,14 +1081,14 @@ function commandIsTerminal(command: CommandRecord): boolean { return command.state === "completed" || command.state === "failed" || command.state === "cancelled"; } -function emitRunEventOtelSpan(type: RunEvent["type"], payload: JsonRecord, run: RunRecord, startedAt: number): void { +async function emitRunEventOtelSpan(type: RunEvent["type"], payload: JsonRecord, run: RunRecord, startedAt: number): Promise { const phase = stringJsonValue(payload.phase); const terminalStatus = stringJsonValue(payload.terminalStatus); const failureKind = stringJsonValue(payload.failureKind); const eventName = runEventOtelSpanName(type, phase, terminalStatus, failureKind); if (!eventName) return; const isError = type === "error" || terminalStatus === "failed" || terminalStatus === "blocked"; - void emitAgentRunOtelSpan(eventName, run, process.env, { + await emitAgentRunOtelSpan(eventName, run, process.env, { startTimeMs: startedAt, kind: 2, status: isError ? "error" : "ok",