feat: add manager otel spans
This commit is contained in:
@@ -0,0 +1,140 @@
|
||||
import { createHash, randomBytes } from "node:crypto";
|
||||
import type { CommandRecord, JsonRecord, RunRecord } from "./types.js";
|
||||
|
||||
const OTLP_TIMEOUT_MS = 1500;
|
||||
const ZERO_TRACE_ID = "00000000000000000000000000000000";
|
||||
const ZERO_SPAN_ID = "0000000000000000";
|
||||
|
||||
export function agentRunBusinessTraceId(run: RunRecord | null | undefined, command?: CommandRecord | null): string | null {
|
||||
const traceSink = asRecord(run?.traceSink);
|
||||
const sessionMetadata = asRecord(run?.sessionRef?.metadata);
|
||||
const commandPayload = asRecord(command?.payload);
|
||||
for (const value of [
|
||||
traceSink?.traceId,
|
||||
traceSink?.businessTraceId,
|
||||
sessionMetadata?.hwlabTraceId,
|
||||
sessionMetadata?.traceId,
|
||||
commandPayload?.traceId,
|
||||
]) {
|
||||
const text = typeof value === "string" ? value.trim() : "";
|
||||
if (/^trc_[A-Za-z0-9_.:-]+$/u.test(text)) return text;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
export function agentRunOtelTraceContext(run: RunRecord | null | undefined, command?: CommandRecord | null) {
|
||||
const businessTraceId = agentRunBusinessTraceId(run, command) ?? `run_${String(run?.id ?? "unknown")}`;
|
||||
const traceId = nonZeroHex(createHash("sha256").update(`hwlab-code-agent:${businessTraceId}`).digest("hex").slice(0, 32), ZERO_TRACE_ID);
|
||||
const parentSpanId = nonZeroHex(createHash("sha256").update(`agentrun-manager-parent:${businessTraceId}`).digest("hex").slice(0, 16), ZERO_SPAN_ID);
|
||||
return { businessTraceId, traceId, parentSpanId, traceparent: `00-${traceId}-${parentSpanId}-01`, valuesPrinted: false };
|
||||
}
|
||||
|
||||
export async function emitAgentRunOtelSpan(name: string, run: RunRecord | null | undefined, env: NodeJS.ProcessEnv = process.env, options: { command?: CommandRecord | null; startTimeMs?: number; endTimeMs?: number; status?: "ok" | "error"; error?: unknown; attributes?: JsonRecord; kind?: number } = {}): Promise<JsonRecord> {
|
||||
const endpoint = resolveOtlpTracesEndpoint(env);
|
||||
if (!endpoint || typeof fetch !== "function") return { ok: false, skipped: true, reason: "otlp-endpoint-missing", valuesPrinted: false };
|
||||
const context = agentRunOtelTraceContext(run, options.command ?? null);
|
||||
const startedAtMs = Number.isFinite(Number(options.startTimeMs)) ? Number(options.startTimeMs) : Date.now();
|
||||
const endedAtMs = Number.isFinite(Number(options.endTimeMs)) ? Number(options.endTimeMs) : Date.now();
|
||||
const spanId = nonZeroHex(randomBytes(8).toString("hex"), ZERO_SPAN_ID);
|
||||
const statusCode = options.status === "error" || options.error ? 2 : 1;
|
||||
const body = {
|
||||
resourceSpans: [{
|
||||
resource: { attributes: attributesFromRecord(resourceAttributes(env, run)) },
|
||||
scopeSpans: [{
|
||||
scope: { name: "agentrun.manager", version: "1" },
|
||||
spans: [{
|
||||
traceId: context.traceId,
|
||||
spanId,
|
||||
parentSpanId: context.parentSpanId,
|
||||
name,
|
||||
kind: Number(options.kind ?? 1),
|
||||
startTimeUnixNano: unixNano(startedAtMs),
|
||||
endTimeUnixNano: unixNano(Math.max(startedAtMs, endedAtMs)),
|
||||
attributes: attributesFromRecord({
|
||||
traceId: context.businessTraceId,
|
||||
"otel.trace_id": context.traceId,
|
||||
"agentrun.stage": name,
|
||||
runId: run?.id ?? null,
|
||||
commandId: options.command?.id ?? null,
|
||||
sessionId: run?.sessionRef?.sessionId ?? null,
|
||||
...options.attributes,
|
||||
}),
|
||||
status: {
|
||||
code: statusCode,
|
||||
...(options.error ? { message: String((options.error as Error)?.message ?? options.error).slice(0, 300) } : {}),
|
||||
},
|
||||
}],
|
||||
}],
|
||||
}],
|
||||
};
|
||||
const controller = new AbortController();
|
||||
const timeout = setTimeout(() => controller.abort(), OTLP_TIMEOUT_MS);
|
||||
try {
|
||||
const response = await fetch(endpoint, {
|
||||
method: "POST",
|
||||
headers: { "content-type": "application/json" },
|
||||
body: JSON.stringify(body),
|
||||
signal: controller.signal,
|
||||
});
|
||||
return { ok: response.ok, status: response.status, valuesPrinted: false };
|
||||
} catch (error) {
|
||||
return { ok: false, error: (error as Error)?.name === "AbortError" ? "otlp-timeout" : "otlp-send-failed", valuesPrinted: false };
|
||||
} finally {
|
||||
clearTimeout(timeout);
|
||||
}
|
||||
}
|
||||
|
||||
function resolveOtlpTracesEndpoint(env: NodeJS.ProcessEnv): string | null {
|
||||
const explicit = firstNonEmpty(env.AGENTRUN_OTEL_EXPORTER_OTLP_TRACES_ENDPOINT, env.OTEL_EXPORTER_OTLP_TRACES_ENDPOINT);
|
||||
if (explicit) return explicit.replace(/\/+$/u, "");
|
||||
const base = firstNonEmpty(env.AGENTRUN_OTEL_EXPORTER_OTLP_ENDPOINT, env.OTEL_EXPORTER_OTLP_ENDPOINT);
|
||||
return base ? `${base.replace(/\/+$/u, "")}/v1/traces` : null;
|
||||
}
|
||||
|
||||
function resourceAttributes(env: NodeJS.ProcessEnv, run: RunRecord | null | undefined): JsonRecord {
|
||||
return {
|
||||
"service.name": firstNonEmpty(env.OTEL_SERVICE_NAME, "agentrun-manager"),
|
||||
"deployment.environment": firstNonEmpty(env.AGENTRUN_LANE, "unknown"),
|
||||
"unidesk.node": firstNonEmpty(env.UNIDESK_NODE_ID, env.AGENTRUN_NODE_ID, "unknown"),
|
||||
"hwlab.lane": firstNonEmpty(env.HWLAB_RUNTIME_LANE, stringValue(asRecord(run?.traceSink)?.hwlabLane), "unknown"),
|
||||
"k8s.namespace.name": firstNonEmpty(env.POD_NAMESPACE, env.AGENTRUN_RUNTIME_NAMESPACE, "unknown"),
|
||||
"git.commit": firstNonEmpty(env.AGENTRUN_SOURCE_COMMIT, "unknown"),
|
||||
};
|
||||
}
|
||||
|
||||
function attributesFromRecord(record: JsonRecord): Array<{ key: string; value: JsonRecord }> {
|
||||
return Object.entries(record)
|
||||
.filter(([, value]) => value !== undefined && value !== null)
|
||||
.map(([key, value]) => ({ key, value: otlpAnyValue(value) }));
|
||||
}
|
||||
|
||||
function otlpAnyValue(value: unknown): JsonRecord {
|
||||
if (typeof value === "boolean") return { boolValue: value };
|
||||
if (typeof value === "number" && Number.isInteger(value)) return { intValue: String(value) };
|
||||
if (typeof value === "number" && Number.isFinite(value)) return { doubleValue: value };
|
||||
return { stringValue: typeof value === "string" ? value : JSON.stringify(value) };
|
||||
}
|
||||
|
||||
function unixNano(ms: number): string {
|
||||
return String(Math.trunc(ms * 1_000_000));
|
||||
}
|
||||
|
||||
function firstNonEmpty(...values: Array<string | null | undefined>): string | null {
|
||||
for (const value of values) {
|
||||
const text = String(value ?? "").trim();
|
||||
if (text) return text;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
function stringValue(value: unknown): string | null {
|
||||
return typeof value === "string" && value.trim() ? value.trim() : null;
|
||||
}
|
||||
|
||||
function asRecord(value: unknown): JsonRecord | null {
|
||||
return typeof value === "object" && value !== null && !Array.isArray(value) ? value as JsonRecord : null;
|
||||
}
|
||||
|
||||
function nonZeroHex(value: string, zero: string): string {
|
||||
return /^[0-9a-f]+$/u.test(value) && value !== zero ? value : zero.replace(/0$/u, "1");
|
||||
}
|
||||
+49
-8
@@ -20,6 +20,7 @@ import { getProviderProfileConfig, getProviderProfileValidation, listBackendCapa
|
||||
import { listToolCredentials, setGithubSshToolCredential, showToolCredential } from "./tool-credentials.js";
|
||||
import { aipodSpecFromInput, applyAipodSpec, deleteAipodSpec, listAipodSpecs, renderAipodSpecByName, showAipodSpec } from "../common/aipod-specs.js";
|
||||
import { staticWorkReadyCapabilitySummary } from "../common/work-ready.js";
|
||||
import { emitAgentRunOtelSpan } from "../common/otel-trace.js";
|
||||
|
||||
function pvcOptions(defaults: { kubectlCommand?: string } | undefined): SessionPvcOptions {
|
||||
return defaults?.kubectlCommand ? { kubectlCommand: defaults.kubectlCommand } : {};
|
||||
@@ -652,17 +653,36 @@ async function route({ method, url, body, store, sourceCommit, authSummary, runn
|
||||
await refreshRunningQueueTasksForRead(store, queue);
|
||||
return await queueCommanderForRead(store, queue, url.searchParams.get("readerId"));
|
||||
}
|
||||
if (method === "POST" && path === "/api/v1/runs") return await store.createRun(validateCreateRun(body)) as unknown as JsonValue;
|
||||
if (method === "POST" && path === "/api/v1/runs") {
|
||||
const startedAt = Date.now();
|
||||
const run = await store.createRun(validateCreateRun(body));
|
||||
void emitAgentRunOtelSpan("run_created", run, process.env, { startTimeMs: startedAt, kind: 2, attributes: { "http.method": "POST", "http.route": "/api/v1/runs", "http.status_code": 200, backendProfile: run.backendProfile, providerId: run.providerId } });
|
||||
return run as unknown as JsonValue;
|
||||
}
|
||||
const runMatch = path.match(/^\/api\/v1\/runs\/([^/]+)$/u);
|
||||
if (method === "GET" && runMatch) return await store.getRun(runMatch[1] ?? "") as unknown as JsonValue;
|
||||
const eventMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/events$/u);
|
||||
if (method === "GET" && eventMatch) {
|
||||
const startedAt = Date.now();
|
||||
const runId = eventMatch[1] ?? "";
|
||||
const afterSeq = integerQuery(url, "afterSeq", 0);
|
||||
const limit = integerQuery(url, "limit", 100);
|
||||
return { items: await store.listEvents(eventMatch[1] ?? "", afterSeq, limit) as unknown as JsonValue };
|
||||
const run = await store.getRun(runId);
|
||||
const items = await store.listEvents(runId, afterSeq, limit);
|
||||
void emitAgentRunOtelSpan("projection_sync", run, process.env, { startTimeMs: startedAt, kind: 2, attributes: { "http.method": "GET", "http.route": "/api/v1/runs/:runId/events", "http.status_code": 200, afterSeq, limit, eventCount: items.length } });
|
||||
return { items: items as unknown as JsonValue };
|
||||
}
|
||||
const runResultMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/result$/u);
|
||||
if (method === "GET" && runResultMatch) return await buildRunResult(store, runResultMatch[1] ?? "", url.searchParams.get("commandId") ?? undefined) as JsonValue;
|
||||
if (method === "GET" && runResultMatch) {
|
||||
const startedAt = Date.now();
|
||||
const runId = runResultMatch[1] ?? "";
|
||||
const run = await store.getRun(runId);
|
||||
const commandId = url.searchParams.get("commandId") ?? undefined;
|
||||
const command = commandId ? await store.getCommand(commandId) : null;
|
||||
const result = await buildRunResult(store, runId, commandId) as JsonValue;
|
||||
void emitAgentRunOtelSpan("command_result", run, process.env, { command, startTimeMs: startedAt, kind: 2, attributes: { "http.method": "GET", "http.route": "/api/v1/runs/:runId/result", "http.status_code": 200, terminalStatus: typeof result === "object" && result !== null && !Array.isArray(result) ? (result as JsonRecord).terminalStatus ?? null : null } });
|
||||
return result;
|
||||
}
|
||||
const runCancelMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/cancel$/u);
|
||||
if (method === "POST" && runCancelMatch) {
|
||||
const record = body === null ? {} : asRecord(body, "cancel");
|
||||
@@ -670,16 +690,29 @@ async function route({ method, url, body, store, sourceCommit, authSummary, runn
|
||||
return await store.cancelRun(runCancelMatch[1] ?? "", reason) as unknown as JsonValue;
|
||||
}
|
||||
const commandCreateMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/commands$/u);
|
||||
if (method === "POST" && commandCreateMatch) return await store.createCommand(commandCreateMatch[1] ?? "", validateCreateCommand(body)) as unknown as JsonValue;
|
||||
if (method === "POST" && commandCreateMatch) {
|
||||
const startedAt = Date.now();
|
||||
const runId = commandCreateMatch[1] ?? "";
|
||||
const command = await store.createCommand(runId, validateCreateCommand(body));
|
||||
const run = await store.getRun(runId);
|
||||
void emitAgentRunOtelSpan("command_created", run, process.env, { command, startTimeMs: startedAt, kind: 2, attributes: { "http.method": "POST", "http.route": "/api/v1/runs/:runId/commands", "http.status_code": 200, commandType: command.type, commandState: command.state } });
|
||||
return command as unknown as JsonValue;
|
||||
}
|
||||
if (method === "GET" && commandCreateMatch) return { items: await store.listCommands(commandCreateMatch[1] ?? "", integerQuery(url, "afterSeq", 0), integerQuery(url, "limit", 20)) as unknown as JsonValue };
|
||||
const runnerJobMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/runner-jobs$/u);
|
||||
if (method === "POST" && runnerJobMatch) {
|
||||
return await createKubernetesRunnerJob({
|
||||
const startedAt = Date.now();
|
||||
const runId = runnerJobMatch[1] ?? "";
|
||||
const runnerJob = await createKubernetesRunnerJob({
|
||||
store,
|
||||
runId: runnerJobMatch[1] ?? "",
|
||||
runId,
|
||||
input: asRecord(body ?? {}, "runnerJob") as never,
|
||||
defaults: runnerJobDefaultsForRequest(runnerJobDefaults, sourceCommit),
|
||||
}) as unknown as JsonValue;
|
||||
}) as unknown as JsonRecord;
|
||||
const run = await store.getRun(runId);
|
||||
const command = typeof runnerJob.commandId === "string" ? await store.getCommand(runnerJob.commandId) : null;
|
||||
void emitAgentRunOtelSpan("runner_job_created", run, process.env, { command, startTimeMs: startedAt, kind: 2, attributes: { "http.method": "POST", "http.route": "/api/v1/runs/:runId/runner-jobs", "http.status_code": 200, jobName: typeof runnerJob.jobName === "string" ? runnerJob.jobName : null, namespace: typeof runnerJob.namespace === "string" ? runnerJob.namespace : null } });
|
||||
return runnerJob as unknown as JsonValue;
|
||||
}
|
||||
if (method === "GET" && runnerJobMatch) {
|
||||
const runId = runnerJobMatch[1] ?? "";
|
||||
@@ -700,7 +733,15 @@ async function route({ method, url, body, store, sourceCommit, authSummary, runn
|
||||
const commandShowMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/commands\/([^/]+)$/u);
|
||||
if (method === "GET" && commandShowMatch) return await store.getCommand(commandShowMatch[2] ?? "") as unknown as JsonValue;
|
||||
const commandResultMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/commands\/([^/]+)\/result$/u);
|
||||
if (method === "GET" && commandResultMatch) return await buildRunResult(store, commandResultMatch[1] ?? "", commandResultMatch[2] ?? "") as JsonValue;
|
||||
if (method === "GET" && commandResultMatch) {
|
||||
const startedAt = Date.now();
|
||||
const runId = commandResultMatch[1] ?? "";
|
||||
const commandId = commandResultMatch[2] ?? "";
|
||||
const [run, command] = await Promise.all([store.getRun(runId), store.getCommand(commandId)]);
|
||||
const result = await buildRunResult(store, runId, commandId) as JsonValue;
|
||||
void emitAgentRunOtelSpan("command_result", run, process.env, { command, startTimeMs: startedAt, kind: 2, attributes: { "http.method": "GET", "http.route": "/api/v1/runs/:runId/commands/:commandId/result", "http.status_code": 200, terminalStatus: typeof result === "object" && result !== null && !Array.isArray(result) ? (result as JsonRecord).terminalStatus ?? null : null } });
|
||||
return result;
|
||||
}
|
||||
if (method === "POST" && path === "/api/v1/runners/register") return await store.registerRunner(asRecord(body ?? {}, "runner")) as unknown as JsonValue;
|
||||
const claimMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/claim$/u);
|
||||
if (method === "POST" && claimMatch) {
|
||||
|
||||
Reference in New Issue
Block a user