From 18c3667ee0ea16c159f4533dd971a65344ce9472 Mon Sep 17 00:00:00 2001 From: lyon Date: Fri, 19 Jun 2026 23:05:50 +0800 Subject: [PATCH] feat: add manager otel spans --- src/common/otel-trace.ts | 140 +++++++++++++++++++++++++++++++++++++++ src/mgr/server.ts | 57 +++++++++++++--- 2 files changed, 189 insertions(+), 8 deletions(-) create mode 100644 src/common/otel-trace.ts diff --git a/src/common/otel-trace.ts b/src/common/otel-trace.ts new file mode 100644 index 0000000..4d3f5d8 --- /dev/null +++ b/src/common/otel-trace.ts @@ -0,0 +1,140 @@ +import { createHash, randomBytes } from "node:crypto"; +import type { CommandRecord, JsonRecord, RunRecord } from "./types.js"; + +const OTLP_TIMEOUT_MS = 1500; +const ZERO_TRACE_ID = "00000000000000000000000000000000"; +const ZERO_SPAN_ID = "0000000000000000"; + +export function agentRunBusinessTraceId(run: RunRecord | null | undefined, command?: CommandRecord | null): string | null { + const traceSink = asRecord(run?.traceSink); + const sessionMetadata = asRecord(run?.sessionRef?.metadata); + const commandPayload = asRecord(command?.payload); + for (const value of [ + traceSink?.traceId, + traceSink?.businessTraceId, + sessionMetadata?.hwlabTraceId, + sessionMetadata?.traceId, + commandPayload?.traceId, + ]) { + const text = typeof value === "string" ? value.trim() : ""; + if (/^trc_[A-Za-z0-9_.:-]+$/u.test(text)) return text; + } + return null; +} + +export function agentRunOtelTraceContext(run: RunRecord | null | undefined, command?: CommandRecord | null) { + const businessTraceId = agentRunBusinessTraceId(run, command) ?? `run_${String(run?.id ?? "unknown")}`; + const traceId = nonZeroHex(createHash("sha256").update(`hwlab-code-agent:${businessTraceId}`).digest("hex").slice(0, 32), ZERO_TRACE_ID); + const parentSpanId = nonZeroHex(createHash("sha256").update(`agentrun-manager-parent:${businessTraceId}`).digest("hex").slice(0, 16), ZERO_SPAN_ID); + return { businessTraceId, traceId, parentSpanId, traceparent: `00-${traceId}-${parentSpanId}-01`, valuesPrinted: false }; +} + +export async function emitAgentRunOtelSpan(name: string, run: RunRecord | null | undefined, env: NodeJS.ProcessEnv = process.env, options: { command?: CommandRecord | null; startTimeMs?: number; endTimeMs?: number; status?: "ok" | "error"; error?: unknown; attributes?: JsonRecord; kind?: number } = {}): Promise { + const endpoint = resolveOtlpTracesEndpoint(env); + if (!endpoint || typeof fetch !== "function") return { ok: false, skipped: true, reason: "otlp-endpoint-missing", valuesPrinted: false }; + const context = agentRunOtelTraceContext(run, options.command ?? null); + const startedAtMs = Number.isFinite(Number(options.startTimeMs)) ? Number(options.startTimeMs) : Date.now(); + const endedAtMs = Number.isFinite(Number(options.endTimeMs)) ? Number(options.endTimeMs) : Date.now(); + const spanId = nonZeroHex(randomBytes(8).toString("hex"), ZERO_SPAN_ID); + const statusCode = options.status === "error" || options.error ? 2 : 1; + const body = { + resourceSpans: [{ + resource: { attributes: attributesFromRecord(resourceAttributes(env, run)) }, + scopeSpans: [{ + scope: { name: "agentrun.manager", version: "1" }, + spans: [{ + traceId: context.traceId, + spanId, + parentSpanId: context.parentSpanId, + name, + kind: Number(options.kind ?? 1), + startTimeUnixNano: unixNano(startedAtMs), + endTimeUnixNano: unixNano(Math.max(startedAtMs, endedAtMs)), + attributes: attributesFromRecord({ + traceId: context.businessTraceId, + "otel.trace_id": context.traceId, + "agentrun.stage": name, + runId: run?.id ?? null, + commandId: options.command?.id ?? null, + sessionId: run?.sessionRef?.sessionId ?? null, + ...options.attributes, + }), + status: { + code: statusCode, + ...(options.error ? { message: String((options.error as Error)?.message ?? options.error).slice(0, 300) } : {}), + }, + }], + }], + }], + }; + const controller = new AbortController(); + const timeout = setTimeout(() => controller.abort(), OTLP_TIMEOUT_MS); + try { + const response = await fetch(endpoint, { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify(body), + signal: controller.signal, + }); + return { ok: response.ok, status: response.status, valuesPrinted: false }; + } catch (error) { + return { ok: false, error: (error as Error)?.name === "AbortError" ? "otlp-timeout" : "otlp-send-failed", valuesPrinted: false }; + } finally { + clearTimeout(timeout); + } +} + +function resolveOtlpTracesEndpoint(env: NodeJS.ProcessEnv): string | null { + const explicit = firstNonEmpty(env.AGENTRUN_OTEL_EXPORTER_OTLP_TRACES_ENDPOINT, env.OTEL_EXPORTER_OTLP_TRACES_ENDPOINT); + if (explicit) return explicit.replace(/\/+$/u, ""); + const base = firstNonEmpty(env.AGENTRUN_OTEL_EXPORTER_OTLP_ENDPOINT, env.OTEL_EXPORTER_OTLP_ENDPOINT); + return base ? `${base.replace(/\/+$/u, "")}/v1/traces` : null; +} + +function resourceAttributes(env: NodeJS.ProcessEnv, run: RunRecord | null | undefined): JsonRecord { + return { + "service.name": firstNonEmpty(env.OTEL_SERVICE_NAME, "agentrun-manager"), + "deployment.environment": firstNonEmpty(env.AGENTRUN_LANE, "unknown"), + "unidesk.node": firstNonEmpty(env.UNIDESK_NODE_ID, env.AGENTRUN_NODE_ID, "unknown"), + "hwlab.lane": firstNonEmpty(env.HWLAB_RUNTIME_LANE, stringValue(asRecord(run?.traceSink)?.hwlabLane), "unknown"), + "k8s.namespace.name": firstNonEmpty(env.POD_NAMESPACE, env.AGENTRUN_RUNTIME_NAMESPACE, "unknown"), + "git.commit": firstNonEmpty(env.AGENTRUN_SOURCE_COMMIT, "unknown"), + }; +} + +function attributesFromRecord(record: JsonRecord): Array<{ key: string; value: JsonRecord }> { + return Object.entries(record) + .filter(([, value]) => value !== undefined && value !== null) + .map(([key, value]) => ({ key, value: otlpAnyValue(value) })); +} + +function otlpAnyValue(value: unknown): JsonRecord { + if (typeof value === "boolean") return { boolValue: value }; + if (typeof value === "number" && Number.isInteger(value)) return { intValue: String(value) }; + if (typeof value === "number" && Number.isFinite(value)) return { doubleValue: value }; + return { stringValue: typeof value === "string" ? value : JSON.stringify(value) }; +} + +function unixNano(ms: number): string { + return String(Math.trunc(ms * 1_000_000)); +} + +function firstNonEmpty(...values: Array): string | null { + for (const value of values) { + const text = String(value ?? "").trim(); + if (text) return text; + } + return null; +} + +function stringValue(value: unknown): string | null { + return typeof value === "string" && value.trim() ? value.trim() : null; +} + +function asRecord(value: unknown): JsonRecord | null { + return typeof value === "object" && value !== null && !Array.isArray(value) ? value as JsonRecord : null; +} + +function nonZeroHex(value: string, zero: string): string { + return /^[0-9a-f]+$/u.test(value) && value !== zero ? value : zero.replace(/0$/u, "1"); +} diff --git a/src/mgr/server.ts b/src/mgr/server.ts index 830b6dc..805ef68 100644 --- a/src/mgr/server.ts +++ b/src/mgr/server.ts @@ -20,6 +20,7 @@ import { getProviderProfileConfig, getProviderProfileValidation, listBackendCapa import { listToolCredentials, setGithubSshToolCredential, showToolCredential } from "./tool-credentials.js"; import { aipodSpecFromInput, applyAipodSpec, deleteAipodSpec, listAipodSpecs, renderAipodSpecByName, showAipodSpec } from "../common/aipod-specs.js"; import { staticWorkReadyCapabilitySummary } from "../common/work-ready.js"; +import { emitAgentRunOtelSpan } from "../common/otel-trace.js"; function pvcOptions(defaults: { kubectlCommand?: string } | undefined): SessionPvcOptions { return defaults?.kubectlCommand ? { kubectlCommand: defaults.kubectlCommand } : {}; @@ -652,17 +653,36 @@ async function route({ method, url, body, store, sourceCommit, authSummary, runn await refreshRunningQueueTasksForRead(store, queue); return await queueCommanderForRead(store, queue, url.searchParams.get("readerId")); } - if (method === "POST" && path === "/api/v1/runs") return await store.createRun(validateCreateRun(body)) as unknown as JsonValue; + if (method === "POST" && path === "/api/v1/runs") { + const startedAt = Date.now(); + const run = await store.createRun(validateCreateRun(body)); + void emitAgentRunOtelSpan("run_created", run, process.env, { startTimeMs: startedAt, kind: 2, attributes: { "http.method": "POST", "http.route": "/api/v1/runs", "http.status_code": 200, backendProfile: run.backendProfile, providerId: run.providerId } }); + return run as unknown as JsonValue; + } const runMatch = path.match(/^\/api\/v1\/runs\/([^/]+)$/u); if (method === "GET" && runMatch) return await store.getRun(runMatch[1] ?? "") as unknown as JsonValue; const eventMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/events$/u); if (method === "GET" && eventMatch) { + const startedAt = Date.now(); + const runId = eventMatch[1] ?? ""; const afterSeq = integerQuery(url, "afterSeq", 0); const limit = integerQuery(url, "limit", 100); - return { items: await store.listEvents(eventMatch[1] ?? "", afterSeq, limit) as unknown as JsonValue }; + const run = await store.getRun(runId); + const items = await store.listEvents(runId, afterSeq, limit); + void emitAgentRunOtelSpan("projection_sync", run, process.env, { startTimeMs: startedAt, kind: 2, attributes: { "http.method": "GET", "http.route": "/api/v1/runs/:runId/events", "http.status_code": 200, afterSeq, limit, eventCount: items.length } }); + return { items: items as unknown as JsonValue }; } const runResultMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/result$/u); - if (method === "GET" && runResultMatch) return await buildRunResult(store, runResultMatch[1] ?? "", url.searchParams.get("commandId") ?? undefined) as JsonValue; + if (method === "GET" && runResultMatch) { + const startedAt = Date.now(); + const runId = runResultMatch[1] ?? ""; + const run = await store.getRun(runId); + const commandId = url.searchParams.get("commandId") ?? undefined; + const command = commandId ? await store.getCommand(commandId) : null; + const result = await buildRunResult(store, runId, commandId) as JsonValue; + void emitAgentRunOtelSpan("command_result", run, process.env, { command, startTimeMs: startedAt, kind: 2, attributes: { "http.method": "GET", "http.route": "/api/v1/runs/:runId/result", "http.status_code": 200, terminalStatus: typeof result === "object" && result !== null && !Array.isArray(result) ? (result as JsonRecord).terminalStatus ?? null : null } }); + return result; + } const runCancelMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/cancel$/u); if (method === "POST" && runCancelMatch) { const record = body === null ? {} : asRecord(body, "cancel"); @@ -670,16 +690,29 @@ async function route({ method, url, body, store, sourceCommit, authSummary, runn return await store.cancelRun(runCancelMatch[1] ?? "", reason) as unknown as JsonValue; } const commandCreateMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/commands$/u); - if (method === "POST" && commandCreateMatch) return await store.createCommand(commandCreateMatch[1] ?? "", validateCreateCommand(body)) as unknown as JsonValue; + if (method === "POST" && commandCreateMatch) { + const startedAt = Date.now(); + const runId = commandCreateMatch[1] ?? ""; + const command = await store.createCommand(runId, validateCreateCommand(body)); + const run = await store.getRun(runId); + void emitAgentRunOtelSpan("command_created", run, process.env, { command, startTimeMs: startedAt, kind: 2, attributes: { "http.method": "POST", "http.route": "/api/v1/runs/:runId/commands", "http.status_code": 200, commandType: command.type, commandState: command.state } }); + return command as unknown as JsonValue; + } if (method === "GET" && commandCreateMatch) return { items: await store.listCommands(commandCreateMatch[1] ?? "", integerQuery(url, "afterSeq", 0), integerQuery(url, "limit", 20)) as unknown as JsonValue }; const runnerJobMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/runner-jobs$/u); if (method === "POST" && runnerJobMatch) { - return await createKubernetesRunnerJob({ + const startedAt = Date.now(); + const runId = runnerJobMatch[1] ?? ""; + const runnerJob = await createKubernetesRunnerJob({ store, - runId: runnerJobMatch[1] ?? "", + runId, input: asRecord(body ?? {}, "runnerJob") as never, defaults: runnerJobDefaultsForRequest(runnerJobDefaults, sourceCommit), - }) as unknown as JsonValue; + }) as unknown as JsonRecord; + const run = await store.getRun(runId); + const command = typeof runnerJob.commandId === "string" ? await store.getCommand(runnerJob.commandId) : null; + void emitAgentRunOtelSpan("runner_job_created", run, process.env, { command, startTimeMs: startedAt, kind: 2, attributes: { "http.method": "POST", "http.route": "/api/v1/runs/:runId/runner-jobs", "http.status_code": 200, jobName: typeof runnerJob.jobName === "string" ? runnerJob.jobName : null, namespace: typeof runnerJob.namespace === "string" ? runnerJob.namespace : null } }); + return runnerJob as unknown as JsonValue; } if (method === "GET" && runnerJobMatch) { const runId = runnerJobMatch[1] ?? ""; @@ -700,7 +733,15 @@ async function route({ method, url, body, store, sourceCommit, authSummary, runn const commandShowMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/commands\/([^/]+)$/u); if (method === "GET" && commandShowMatch) return await store.getCommand(commandShowMatch[2] ?? "") as unknown as JsonValue; const commandResultMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/commands\/([^/]+)\/result$/u); - if (method === "GET" && commandResultMatch) return await buildRunResult(store, commandResultMatch[1] ?? "", commandResultMatch[2] ?? "") as JsonValue; + if (method === "GET" && commandResultMatch) { + const startedAt = Date.now(); + const runId = commandResultMatch[1] ?? ""; + const commandId = commandResultMatch[2] ?? ""; + const [run, command] = await Promise.all([store.getRun(runId), store.getCommand(commandId)]); + const result = await buildRunResult(store, runId, commandId) as JsonValue; + void emitAgentRunOtelSpan("command_result", run, process.env, { command, startTimeMs: startedAt, kind: 2, attributes: { "http.method": "GET", "http.route": "/api/v1/runs/:runId/commands/:commandId/result", "http.status_code": 200, terminalStatus: typeof result === "object" && result !== null && !Array.isArray(result) ? (result as JsonRecord).terminalStatus ?? null : null } }); + return result; + } if (method === "POST" && path === "/api/v1/runners/register") return await store.registerRunner(asRecord(body ?? {}, "runner")) as unknown as JsonValue; const claimMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/claim$/u); if (method === "POST" && claimMatch) {