Merge pull request #196 from pikasTech/fix-193-otel-trace-spans
feat: add AgentRun manager OTel trace spans
This commit is contained in:
@@ -0,0 +1,140 @@
|
|||||||
|
import { createHash, randomBytes } from "node:crypto";
|
||||||
|
import type { CommandRecord, JsonRecord, RunRecord } from "./types.js";
|
||||||
|
|
||||||
|
const OTLP_TIMEOUT_MS = 1500;
|
||||||
|
const ZERO_TRACE_ID = "00000000000000000000000000000000";
|
||||||
|
const ZERO_SPAN_ID = "0000000000000000";
|
||||||
|
|
||||||
|
export function agentRunBusinessTraceId(run: RunRecord | null | undefined, command?: CommandRecord | null): string | null {
|
||||||
|
const traceSink = asRecord(run?.traceSink);
|
||||||
|
const sessionMetadata = asRecord(run?.sessionRef?.metadata);
|
||||||
|
const commandPayload = asRecord(command?.payload);
|
||||||
|
for (const value of [
|
||||||
|
traceSink?.traceId,
|
||||||
|
traceSink?.businessTraceId,
|
||||||
|
sessionMetadata?.hwlabTraceId,
|
||||||
|
sessionMetadata?.traceId,
|
||||||
|
commandPayload?.traceId,
|
||||||
|
]) {
|
||||||
|
const text = typeof value === "string" ? value.trim() : "";
|
||||||
|
if (/^trc_[A-Za-z0-9_.:-]+$/u.test(text)) return text;
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
export function agentRunOtelTraceContext(run: RunRecord | null | undefined, command?: CommandRecord | null) {
|
||||||
|
const businessTraceId = agentRunBusinessTraceId(run, command) ?? `run_${String(run?.id ?? "unknown")}`;
|
||||||
|
const traceId = nonZeroHex(createHash("sha256").update(`hwlab-code-agent:${businessTraceId}`).digest("hex").slice(0, 32), ZERO_TRACE_ID);
|
||||||
|
const parentSpanId = nonZeroHex(createHash("sha256").update(`agentrun-manager-parent:${businessTraceId}`).digest("hex").slice(0, 16), ZERO_SPAN_ID);
|
||||||
|
return { businessTraceId, traceId, parentSpanId, traceparent: `00-${traceId}-${parentSpanId}-01`, valuesPrinted: false };
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function emitAgentRunOtelSpan(name: string, run: RunRecord | null | undefined, env: NodeJS.ProcessEnv = process.env, options: { command?: CommandRecord | null; startTimeMs?: number; endTimeMs?: number; status?: "ok" | "error"; error?: unknown; attributes?: JsonRecord; kind?: number } = {}): Promise<JsonRecord> {
|
||||||
|
const endpoint = resolveOtlpTracesEndpoint(env);
|
||||||
|
if (!endpoint || typeof fetch !== "function") return { ok: false, skipped: true, reason: "otlp-endpoint-missing", valuesPrinted: false };
|
||||||
|
const context = agentRunOtelTraceContext(run, options.command ?? null);
|
||||||
|
const startedAtMs = Number.isFinite(Number(options.startTimeMs)) ? Number(options.startTimeMs) : Date.now();
|
||||||
|
const endedAtMs = Number.isFinite(Number(options.endTimeMs)) ? Number(options.endTimeMs) : Date.now();
|
||||||
|
const spanId = nonZeroHex(randomBytes(8).toString("hex"), ZERO_SPAN_ID);
|
||||||
|
const statusCode = options.status === "error" || options.error ? 2 : 1;
|
||||||
|
const body = {
|
||||||
|
resourceSpans: [{
|
||||||
|
resource: { attributes: attributesFromRecord(resourceAttributes(env, run)) },
|
||||||
|
scopeSpans: [{
|
||||||
|
scope: { name: "agentrun.manager", version: "1" },
|
||||||
|
spans: [{
|
||||||
|
traceId: context.traceId,
|
||||||
|
spanId,
|
||||||
|
parentSpanId: context.parentSpanId,
|
||||||
|
name,
|
||||||
|
kind: Number(options.kind ?? 1),
|
||||||
|
startTimeUnixNano: unixNano(startedAtMs),
|
||||||
|
endTimeUnixNano: unixNano(Math.max(startedAtMs, endedAtMs)),
|
||||||
|
attributes: attributesFromRecord({
|
||||||
|
traceId: context.businessTraceId,
|
||||||
|
"otel.trace_id": context.traceId,
|
||||||
|
"agentrun.stage": name,
|
||||||
|
runId: run?.id ?? null,
|
||||||
|
commandId: options.command?.id ?? null,
|
||||||
|
sessionId: run?.sessionRef?.sessionId ?? null,
|
||||||
|
...options.attributes,
|
||||||
|
}),
|
||||||
|
status: {
|
||||||
|
code: statusCode,
|
||||||
|
...(options.error ? { message: String((options.error as Error)?.message ?? options.error).slice(0, 300) } : {}),
|
||||||
|
},
|
||||||
|
}],
|
||||||
|
}],
|
||||||
|
}],
|
||||||
|
};
|
||||||
|
const controller = new AbortController();
|
||||||
|
const timeout = setTimeout(() => controller.abort(), OTLP_TIMEOUT_MS);
|
||||||
|
try {
|
||||||
|
const response = await fetch(endpoint, {
|
||||||
|
method: "POST",
|
||||||
|
headers: { "content-type": "application/json" },
|
||||||
|
body: JSON.stringify(body),
|
||||||
|
signal: controller.signal,
|
||||||
|
});
|
||||||
|
return { ok: response.ok, status: response.status, valuesPrinted: false };
|
||||||
|
} catch (error) {
|
||||||
|
return { ok: false, error: (error as Error)?.name === "AbortError" ? "otlp-timeout" : "otlp-send-failed", valuesPrinted: false };
|
||||||
|
} finally {
|
||||||
|
clearTimeout(timeout);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
function resolveOtlpTracesEndpoint(env: NodeJS.ProcessEnv): string | null {
|
||||||
|
const explicit = firstNonEmpty(env.AGENTRUN_OTEL_EXPORTER_OTLP_TRACES_ENDPOINT, env.OTEL_EXPORTER_OTLP_TRACES_ENDPOINT);
|
||||||
|
if (explicit) return explicit.replace(/\/+$/u, "");
|
||||||
|
const base = firstNonEmpty(env.AGENTRUN_OTEL_EXPORTER_OTLP_ENDPOINT, env.OTEL_EXPORTER_OTLP_ENDPOINT);
|
||||||
|
return base ? `${base.replace(/\/+$/u, "")}/v1/traces` : null;
|
||||||
|
}
|
||||||
|
|
||||||
|
function resourceAttributes(env: NodeJS.ProcessEnv, run: RunRecord | null | undefined): JsonRecord {
|
||||||
|
return {
|
||||||
|
"service.name": firstNonEmpty(env.OTEL_SERVICE_NAME, "agentrun-manager"),
|
||||||
|
"deployment.environment": firstNonEmpty(env.AGENTRUN_LANE, "unknown"),
|
||||||
|
"unidesk.node": firstNonEmpty(env.UNIDESK_NODE_ID, env.AGENTRUN_NODE_ID, "unknown"),
|
||||||
|
"hwlab.lane": firstNonEmpty(env.HWLAB_RUNTIME_LANE, stringValue(asRecord(run?.traceSink)?.hwlabLane), "unknown"),
|
||||||
|
"k8s.namespace.name": firstNonEmpty(env.POD_NAMESPACE, env.AGENTRUN_RUNTIME_NAMESPACE, "unknown"),
|
||||||
|
"git.commit": firstNonEmpty(env.AGENTRUN_SOURCE_COMMIT, "unknown"),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
function attributesFromRecord(record: JsonRecord): Array<{ key: string; value: JsonRecord }> {
|
||||||
|
return Object.entries(record)
|
||||||
|
.filter(([, value]) => value !== undefined && value !== null)
|
||||||
|
.map(([key, value]) => ({ key, value: otlpAnyValue(value) }));
|
||||||
|
}
|
||||||
|
|
||||||
|
function otlpAnyValue(value: unknown): JsonRecord {
|
||||||
|
if (typeof value === "boolean") return { boolValue: value };
|
||||||
|
if (typeof value === "number" && Number.isInteger(value)) return { intValue: String(value) };
|
||||||
|
if (typeof value === "number" && Number.isFinite(value)) return { doubleValue: value };
|
||||||
|
return { stringValue: typeof value === "string" ? value : JSON.stringify(value) };
|
||||||
|
}
|
||||||
|
|
||||||
|
function unixNano(ms: number): string {
|
||||||
|
return String(Math.trunc(ms * 1_000_000));
|
||||||
|
}
|
||||||
|
|
||||||
|
function firstNonEmpty(...values: Array<string | null | undefined>): string | null {
|
||||||
|
for (const value of values) {
|
||||||
|
const text = String(value ?? "").trim();
|
||||||
|
if (text) return text;
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
function stringValue(value: unknown): string | null {
|
||||||
|
return typeof value === "string" && value.trim() ? value.trim() : null;
|
||||||
|
}
|
||||||
|
|
||||||
|
function asRecord(value: unknown): JsonRecord | null {
|
||||||
|
return typeof value === "object" && value !== null && !Array.isArray(value) ? value as JsonRecord : null;
|
||||||
|
}
|
||||||
|
|
||||||
|
function nonZeroHex(value: string, zero: string): string {
|
||||||
|
return /^[0-9a-f]+$/u.test(value) && value !== zero ? value : zero.replace(/0$/u, "1");
|
||||||
|
}
|
||||||
+49
-8
@@ -20,6 +20,7 @@ import { getProviderProfileConfig, getProviderProfileValidation, listBackendCapa
|
|||||||
import { listToolCredentials, setGithubSshToolCredential, showToolCredential } from "./tool-credentials.js";
|
import { listToolCredentials, setGithubSshToolCredential, showToolCredential } from "./tool-credentials.js";
|
||||||
import { aipodSpecFromInput, applyAipodSpec, deleteAipodSpec, listAipodSpecs, renderAipodSpecByName, showAipodSpec } from "../common/aipod-specs.js";
|
import { aipodSpecFromInput, applyAipodSpec, deleteAipodSpec, listAipodSpecs, renderAipodSpecByName, showAipodSpec } from "../common/aipod-specs.js";
|
||||||
import { staticWorkReadyCapabilitySummary } from "../common/work-ready.js";
|
import { staticWorkReadyCapabilitySummary } from "../common/work-ready.js";
|
||||||
|
import { emitAgentRunOtelSpan } from "../common/otel-trace.js";
|
||||||
|
|
||||||
function pvcOptions(defaults: { kubectlCommand?: string } | undefined): SessionPvcOptions {
|
function pvcOptions(defaults: { kubectlCommand?: string } | undefined): SessionPvcOptions {
|
||||||
return defaults?.kubectlCommand ? { kubectlCommand: defaults.kubectlCommand } : {};
|
return defaults?.kubectlCommand ? { kubectlCommand: defaults.kubectlCommand } : {};
|
||||||
@@ -652,17 +653,36 @@ async function route({ method, url, body, store, sourceCommit, authSummary, runn
|
|||||||
await refreshRunningQueueTasksForRead(store, queue);
|
await refreshRunningQueueTasksForRead(store, queue);
|
||||||
return await queueCommanderForRead(store, queue, url.searchParams.get("readerId"));
|
return await queueCommanderForRead(store, queue, url.searchParams.get("readerId"));
|
||||||
}
|
}
|
||||||
if (method === "POST" && path === "/api/v1/runs") return await store.createRun(validateCreateRun(body)) as unknown as JsonValue;
|
if (method === "POST" && path === "/api/v1/runs") {
|
||||||
|
const startedAt = Date.now();
|
||||||
|
const run = await store.createRun(validateCreateRun(body));
|
||||||
|
void emitAgentRunOtelSpan("run_created", run, process.env, { startTimeMs: startedAt, kind: 2, attributes: { "http.method": "POST", "http.route": "/api/v1/runs", "http.status_code": 200, backendProfile: run.backendProfile, providerId: run.providerId } });
|
||||||
|
return run as unknown as JsonValue;
|
||||||
|
}
|
||||||
const runMatch = path.match(/^\/api\/v1\/runs\/([^/]+)$/u);
|
const runMatch = path.match(/^\/api\/v1\/runs\/([^/]+)$/u);
|
||||||
if (method === "GET" && runMatch) return await store.getRun(runMatch[1] ?? "") as unknown as JsonValue;
|
if (method === "GET" && runMatch) return await store.getRun(runMatch[1] ?? "") as unknown as JsonValue;
|
||||||
const eventMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/events$/u);
|
const eventMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/events$/u);
|
||||||
if (method === "GET" && eventMatch) {
|
if (method === "GET" && eventMatch) {
|
||||||
|
const startedAt = Date.now();
|
||||||
|
const runId = eventMatch[1] ?? "";
|
||||||
const afterSeq = integerQuery(url, "afterSeq", 0);
|
const afterSeq = integerQuery(url, "afterSeq", 0);
|
||||||
const limit = integerQuery(url, "limit", 100);
|
const limit = integerQuery(url, "limit", 100);
|
||||||
return { items: await store.listEvents(eventMatch[1] ?? "", afterSeq, limit) as unknown as JsonValue };
|
const run = await store.getRun(runId);
|
||||||
|
const items = await store.listEvents(runId, afterSeq, limit);
|
||||||
|
void emitAgentRunOtelSpan("projection_sync", run, process.env, { startTimeMs: startedAt, kind: 2, attributes: { "http.method": "GET", "http.route": "/api/v1/runs/:runId/events", "http.status_code": 200, afterSeq, limit, eventCount: items.length } });
|
||||||
|
return { items: items as unknown as JsonValue };
|
||||||
}
|
}
|
||||||
const runResultMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/result$/u);
|
const runResultMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/result$/u);
|
||||||
if (method === "GET" && runResultMatch) return await buildRunResult(store, runResultMatch[1] ?? "", url.searchParams.get("commandId") ?? undefined) as JsonValue;
|
if (method === "GET" && runResultMatch) {
|
||||||
|
const startedAt = Date.now();
|
||||||
|
const runId = runResultMatch[1] ?? "";
|
||||||
|
const run = await store.getRun(runId);
|
||||||
|
const commandId = url.searchParams.get("commandId") ?? undefined;
|
||||||
|
const command = commandId ? await store.getCommand(commandId) : null;
|
||||||
|
const result = await buildRunResult(store, runId, commandId) as JsonValue;
|
||||||
|
void emitAgentRunOtelSpan("command_result", run, process.env, { command, startTimeMs: startedAt, kind: 2, attributes: { "http.method": "GET", "http.route": "/api/v1/runs/:runId/result", "http.status_code": 200, terminalStatus: typeof result === "object" && result !== null && !Array.isArray(result) ? (result as JsonRecord).terminalStatus ?? null : null } });
|
||||||
|
return result;
|
||||||
|
}
|
||||||
const runCancelMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/cancel$/u);
|
const runCancelMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/cancel$/u);
|
||||||
if (method === "POST" && runCancelMatch) {
|
if (method === "POST" && runCancelMatch) {
|
||||||
const record = body === null ? {} : asRecord(body, "cancel");
|
const record = body === null ? {} : asRecord(body, "cancel");
|
||||||
@@ -670,16 +690,29 @@ async function route({ method, url, body, store, sourceCommit, authSummary, runn
|
|||||||
return await store.cancelRun(runCancelMatch[1] ?? "", reason) as unknown as JsonValue;
|
return await store.cancelRun(runCancelMatch[1] ?? "", reason) as unknown as JsonValue;
|
||||||
}
|
}
|
||||||
const commandCreateMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/commands$/u);
|
const commandCreateMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/commands$/u);
|
||||||
if (method === "POST" && commandCreateMatch) return await store.createCommand(commandCreateMatch[1] ?? "", validateCreateCommand(body)) as unknown as JsonValue;
|
if (method === "POST" && commandCreateMatch) {
|
||||||
|
const startedAt = Date.now();
|
||||||
|
const runId = commandCreateMatch[1] ?? "";
|
||||||
|
const command = await store.createCommand(runId, validateCreateCommand(body));
|
||||||
|
const run = await store.getRun(runId);
|
||||||
|
void emitAgentRunOtelSpan("command_created", run, process.env, { command, startTimeMs: startedAt, kind: 2, attributes: { "http.method": "POST", "http.route": "/api/v1/runs/:runId/commands", "http.status_code": 200, commandType: command.type, commandState: command.state } });
|
||||||
|
return command as unknown as JsonValue;
|
||||||
|
}
|
||||||
if (method === "GET" && commandCreateMatch) return { items: await store.listCommands(commandCreateMatch[1] ?? "", integerQuery(url, "afterSeq", 0), integerQuery(url, "limit", 20)) as unknown as JsonValue };
|
if (method === "GET" && commandCreateMatch) return { items: await store.listCommands(commandCreateMatch[1] ?? "", integerQuery(url, "afterSeq", 0), integerQuery(url, "limit", 20)) as unknown as JsonValue };
|
||||||
const runnerJobMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/runner-jobs$/u);
|
const runnerJobMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/runner-jobs$/u);
|
||||||
if (method === "POST" && runnerJobMatch) {
|
if (method === "POST" && runnerJobMatch) {
|
||||||
return await createKubernetesRunnerJob({
|
const startedAt = Date.now();
|
||||||
|
const runId = runnerJobMatch[1] ?? "";
|
||||||
|
const runnerJob = await createKubernetesRunnerJob({
|
||||||
store,
|
store,
|
||||||
runId: runnerJobMatch[1] ?? "",
|
runId,
|
||||||
input: asRecord(body ?? {}, "runnerJob") as never,
|
input: asRecord(body ?? {}, "runnerJob") as never,
|
||||||
defaults: runnerJobDefaultsForRequest(runnerJobDefaults, sourceCommit),
|
defaults: runnerJobDefaultsForRequest(runnerJobDefaults, sourceCommit),
|
||||||
}) as unknown as JsonValue;
|
}) as unknown as JsonRecord;
|
||||||
|
const run = await store.getRun(runId);
|
||||||
|
const command = typeof runnerJob.commandId === "string" ? await store.getCommand(runnerJob.commandId) : null;
|
||||||
|
void emitAgentRunOtelSpan("runner_job_created", run, process.env, { command, startTimeMs: startedAt, kind: 2, attributes: { "http.method": "POST", "http.route": "/api/v1/runs/:runId/runner-jobs", "http.status_code": 200, jobName: typeof runnerJob.jobName === "string" ? runnerJob.jobName : null, namespace: typeof runnerJob.namespace === "string" ? runnerJob.namespace : null } });
|
||||||
|
return runnerJob as unknown as JsonValue;
|
||||||
}
|
}
|
||||||
if (method === "GET" && runnerJobMatch) {
|
if (method === "GET" && runnerJobMatch) {
|
||||||
const runId = runnerJobMatch[1] ?? "";
|
const runId = runnerJobMatch[1] ?? "";
|
||||||
@@ -700,7 +733,15 @@ async function route({ method, url, body, store, sourceCommit, authSummary, runn
|
|||||||
const commandShowMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/commands\/([^/]+)$/u);
|
const commandShowMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/commands\/([^/]+)$/u);
|
||||||
if (method === "GET" && commandShowMatch) return await store.getCommand(commandShowMatch[2] ?? "") as unknown as JsonValue;
|
if (method === "GET" && commandShowMatch) return await store.getCommand(commandShowMatch[2] ?? "") as unknown as JsonValue;
|
||||||
const commandResultMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/commands\/([^/]+)\/result$/u);
|
const commandResultMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/commands\/([^/]+)\/result$/u);
|
||||||
if (method === "GET" && commandResultMatch) return await buildRunResult(store, commandResultMatch[1] ?? "", commandResultMatch[2] ?? "") as JsonValue;
|
if (method === "GET" && commandResultMatch) {
|
||||||
|
const startedAt = Date.now();
|
||||||
|
const runId = commandResultMatch[1] ?? "";
|
||||||
|
const commandId = commandResultMatch[2] ?? "";
|
||||||
|
const [run, command] = await Promise.all([store.getRun(runId), store.getCommand(commandId)]);
|
||||||
|
const result = await buildRunResult(store, runId, commandId) as JsonValue;
|
||||||
|
void emitAgentRunOtelSpan("command_result", run, process.env, { command, startTimeMs: startedAt, kind: 2, attributes: { "http.method": "GET", "http.route": "/api/v1/runs/:runId/commands/:commandId/result", "http.status_code": 200, terminalStatus: typeof result === "object" && result !== null && !Array.isArray(result) ? (result as JsonRecord).terminalStatus ?? null : null } });
|
||||||
|
return result;
|
||||||
|
}
|
||||||
if (method === "POST" && path === "/api/v1/runners/register") return await store.registerRunner(asRecord(body ?? {}, "runner")) as unknown as JsonValue;
|
if (method === "POST" && path === "/api/v1/runners/register") return await store.registerRunner(asRecord(body ?? {}, "runner")) as unknown as JsonValue;
|
||||||
const claimMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/claim$/u);
|
const claimMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/claim$/u);
|
||||||
if (method === "POST" && claimMatch) {
|
if (method === "POST" && claimMatch) {
|
||||||
|
|||||||
Reference in New Issue
Block a user