diff --git a/.agents/skills/unidesk-otel/SKILL.md b/.agents/skills/unidesk-otel/SKILL.md index 6b87f2ac..d57b332c 100644 --- a/.agents/skills/unidesk-otel/SKILL.md +++ b/.agents/skills/unidesk-otel/SKILL.md @@ -88,6 +88,28 @@ OTel trace 内常见业务关联属性: 追 `Code Agent 代理暂时无法连接上游`、`provider-stream-disconnected`、Workbench 加载/转圈、turn idle 报错、AgentRun command terminal 状态时: +优先用一条诊断命令汇总业务 trace、OTel trace、服务追穿、AgentRun 终态、HWLAB 读模型和 HTTP 403/401/5xx 根因: + +```bash +bun scripts/cli.ts platform-infra observability diagnose-code-agent \ + --target D601 \ + --business-trace-id +``` + +默认输出必须保持有界低噪声,重点看: + +- `mapping.businessTraceId` / `mapping.otelTraceId` +- `servicePath` 是否同时到达 `hwlab-cloud-api`、`agentrun-manager`、`agentrun-runner` +- `identity` 里的 `runId`、`commandId`、`sessionId`、`runnerJobId`、`runnerId`、`backendProfile`、`sourceCommit` +- `agentrun.terminalStatus`、`terminalEventType`、`runnerProviderClassification` +- `hwlabReadModel.sourceEventCount`、`requestedSinceSeq`、`turnStatusCounts` +- `http.problemCounts` 和 `projectionLag.status` +- `summary.rootCause` 与按置信度排序的 `rootCauseCandidates` + +只有需要展开 span 明细时使用 `--full`;只有排查 Tempo raw 响应或 CLI 解析器时使用 `--raw`。默认输出不得包含 Secret、Authorization header、DSN、可复制凭据或完整运行 transcript。 + +若没有业务 traceId 或诊断结果还需要 drill-down,再使用低层 trace/search: + 1. 先确认 OTel backend ready: `bun scripts/cli.ts platform-infra observability status --target D601` 2. 查业务 trace 对应 OTel trace: diff --git a/scripts/src/platform-infra-observability.ts b/scripts/src/platform-infra-observability.ts index cf0763ec..97e1da13 100644 --- a/scripts/src/platform-infra-observability.ts +++ b/scripts/src/platform-infra-observability.ts @@ -138,9 +138,17 @@ interface SearchOptions extends CommonOptions { lookbackMinutes: number; } +interface DiagnoseCodeAgentOptions extends CommonOptions { + businessTraceId: string | null; + traceId: string | null; + limit: number; + candidateLimit: number; + lookbackMinutes: number; +} + export function observabilityHelp(): Record { return { - command: "platform-infra observability plan|apply|status|validate|trace|search", + command: "platform-infra observability plan|apply|status|validate|trace|search|diagnose-code-agent", output: "json", configTruth: "config/platform-infra/observability.yaml", spec: "PJ2026-01060501 OTel追踪 draft-2026-06-19-p0", @@ -152,6 +160,7 @@ export function observabilityHelp(): Record { "bun scripts/cli.ts platform-infra observability validate --target D601 [--full|--raw]", "bun scripts/cli.ts platform-infra observability trace --target D601 --trace-id [--grep provider-stream-disconnected] [--limit 40] [--full|--raw]", "bun scripts/cli.ts platform-infra observability search --target D601 --grep 'no rollout found' [--lookback-minutes 360] [--candidate-limit 80] [--limit 20] [--full|--raw]", + "bun scripts/cli.ts platform-infra observability diagnose-code-agent --target D601 --business-trace-id [--full|--raw]", ], boundary: "Prometheus remains the metrics source; this command owns only platform-infra OTel Collector, trace backend readiness, and trace lookup.", }; @@ -165,6 +174,7 @@ export async function runPlatformObservabilityCommand(config: UniDeskConfig, arg if (action === "validate") return await validate(config, parseCommonOptions(args.slice(1))); if (action === "trace") return await trace(config, parseTraceOptions(args.slice(1))); if (action === "search") return await search(config, parseSearchOptions(args.slice(1))); + if (action === "diagnose-code-agent") return await diagnoseCodeAgent(config, parseDiagnoseCodeAgentOptions(args.slice(1))); return { ok: false, error: "unsupported-platform-infra-observability-command", args, help: observabilityHelp() }; } @@ -302,6 +312,60 @@ function parseSearchOptions(args: string[]): SearchOptions { return { ...parseCommonOptions(commonArgs), grep, query, limit, candidateLimit, lookbackMinutes }; } +function parseDiagnoseCodeAgentOptions(args: string[]): DiagnoseCodeAgentOptions { + const commonArgs: string[] = []; + let businessTraceId: string | null = null; + let traceId: string | null = null; + let limit = 40; + let candidateLimit = 20; + let lookbackMinutes = 10080; + for (let index = 0; index < args.length; index += 1) { + const arg = args[index]; + if (arg === "--business-trace-id" || arg === "--trace") { + const value = args[index + 1]; + if (value === undefined || value.startsWith("--")) throw new Error(`${arg} requires a value`); + if (!/^trc_[A-Za-z0-9_-]+$/u.test(value)) throw new Error(`${arg} must look like trc_`); + businessTraceId = value; + index += 1; + } else if (arg === "--trace-id" || arg === "--otel-trace-id") { + const value = args[index + 1]; + if (value === undefined || value.startsWith("--")) throw new Error(`${arg} requires a value`); + if (!/^[0-9a-fA-F]{32}$/u.test(value)) throw new Error(`${arg} must be a 32-character OpenTelemetry trace id encoded as hex`); + traceId = value.toLowerCase(); + index += 1; + } else if (arg === "--limit") { + const value = args[index + 1]; + if (value === undefined || value.startsWith("--")) throw new Error("--limit requires a value"); + const parsed = Number(value); + if (!Number.isInteger(parsed) || parsed < 1 || parsed > 200) throw new Error("--limit must be an integer from 1 to 200"); + limit = parsed; + index += 1; + } else if (arg === "--candidate-limit") { + const value = args[index + 1]; + if (value === undefined || value.startsWith("--")) throw new Error("--candidate-limit requires a value"); + const parsed = Number(value); + if (!Number.isInteger(parsed) || parsed < 1 || parsed > 100) throw new Error("--candidate-limit must be an integer from 1 to 100"); + candidateLimit = parsed; + index += 1; + } else if (arg === "--lookback-minutes" || arg === "--since-minutes") { + const value = args[index + 1]; + if (value === undefined || value.startsWith("--")) throw new Error(`${arg} requires a value`); + const parsed = Number(value); + if (!Number.isInteger(parsed) || parsed < 1 || parsed > 10080) throw new Error(`${arg} must be an integer from 1 to 10080`); + lookbackMinutes = parsed; + index += 1; + } else { + commonArgs.push(arg); + if (arg === "--target") { + commonArgs.push(args[index + 1] ?? ""); + index += 1; + } + } + } + if (businessTraceId === null && traceId === null) throw new Error("observability diagnose-code-agent requires --business-trace-id or --trace-id "); + return { ...parseCommonOptions(commonArgs), businessTraceId, traceId, limit, candidateLimit, lookbackMinutes }; +} + function readObservabilityConfig(): ObservabilityConfig { const parsed = Bun.YAML.parse(readFileSync(configFile, "utf8")) as unknown; const root = asRecord(parsed, configLabel); @@ -611,6 +675,42 @@ async function search(config: UniDeskConfig, options: SearchOptions): Promise> { + const observability = readObservabilityConfig(); + const target = resolveTarget(observability, options.targetId); + const endSeconds = Math.floor(Date.now() / 1000); + const startSeconds = endSeconds - (options.lookbackMinutes * 60); + let searchPath: string | null = null; + if (options.traceId === null && options.businessTraceId !== null) { + const params = new URLSearchParams({ + start: String(startSeconds), + end: String(endSeconds), + limit: String(options.candidateLimit), + q: `{ .traceId = "${options.businessTraceId}" }`, + }); + searchPath = `/api/search?${params.toString()}`; + } + const result = await capture(config, target.route, ["sh"], diagnoseCodeAgentScript(observability, target, searchPath, options)); + const parsed = parseJsonOutput(result.stdout); + return { + ok: result.exitCode === 0 && parsed?.ok === true, + action: "platform-infra-observability-diagnose-code-agent", + mutation: false, + target: targetSummary(target), + query: { + backend: observability.traceBackend.type, + service: observability.traceBackend.serviceName, + businessTraceId: options.businessTraceId, + traceId: options.traceId, + path: searchPath, + lookbackMinutes: options.lookbackMinutes, + candidateLimit: options.candidateLimit, + limit: options.limit, + }, + result: redactSensitiveUnknown(parsed) ?? compactCapture(result, { full: true }), + }; +} + function renderManifest(observability: ObservabilityConfig, target: ObservabilityTarget): string { const collectorImage = imageReference(observability.images.collector); const tempoImage = imageReference(observability.images.tempo); @@ -1748,6 +1848,767 @@ PY `; } +function diagnoseCodeAgentScript(observability: ObservabilityConfig, target: ObservabilityTarget, searchPath: string | null, options: DiagnoseCodeAgentOptions): string { + const proxyPrefix = `/api/v1/namespaces/${target.namespace}/services/http:${observability.traceBackend.serviceName}:http/proxy`; + const searchProxyPath = searchPath === null ? null : `${proxyPrefix}${searchPath}`; + const traceIdLiteral = options.traceId === null ? "None" : JSON.stringify(options.traceId); + const businessTraceIdLiteral = options.businessTraceId === null ? "None" : JSON.stringify(options.businessTraceId); + const searchPathLiteral = searchPath === null ? "None" : JSON.stringify(searchPath); + const searchProxyPathLiteral = searchProxyPath === null ? "None" : JSON.stringify(searchProxyPath); + const businessTraceIdForNext = options.businessTraceId ?? ""; + return ` +set -u +python3 - <<'PY' +import collections, json, subprocess, time + +BUSINESS_TRACE_ID = ${businessTraceIdLiteral} +TRACE_ID = ${traceIdLiteral} +SEARCH_PATH = ${searchPathLiteral} +SEARCH_PROXY_PATH = ${searchProxyPathLiteral} +TRACE_PROXY_PREFIX = ${JSON.stringify(proxyPrefix)} +TRACE_PATH_TEMPLATE = ${JSON.stringify(observability.probes.traceQueryPathTemplate)} +FULL = ${options.full ? "True" : "False"} +RAW = ${options.raw ? "True" : "False"} +LIMIT = ${options.limit} +DEADLINE = time.time() + 50 +IMPORTANT_ATTRS = [ + "traceId", "otel.trace_id", "agentrun.stage", "runId", "commandId", + "sessionId", "turnId", "threadId", "runnerJobId", "failureKind", "willRetry", + "terminalStatus", "phase", "message", "http.route", "http.status_code", + "http.method", "eventType", "runnerId", "attemptId", "backendProfile", + "sourceCommit", "jobName", "podName", "logPath", "storageKind", + "storagePvcName", "storagePvcPhase", "resumeMode", "rolloutId", + "returnedEvents", "sinceSeq", "afterSeq", "limit", "fromSeq", "toSeq", + "totalEvents", "hasMore", "fullTraceLoaded", "rawEventCount", + "maxSeq", "traceLastSeq", "endSeq", "commandFiltered", + "seq", "eventSeq", "sourceSeq", "sourceLatestSeq", "latestSeq", + "lastSeq", "sourceEventCount", "projectedSeq", "lastProjectedSeq", + "status", "turnStatus", "error", "error.message", "exception.type", + "exception.message", "valuesPrinted", +] +IDENTITY_KEYS = [ + "runId", "commandId", "sessionId", "turnId", "threadId", + "runnerJobId", "runnerId", "attemptId", "backendProfile", + "sourceCommit", "jobName", "podName", +] + +def run_kubectl(proxy_path, timeout=15): + try: + proc = subprocess.run( + ["kubectl", "get", "--raw", proxy_path], + capture_output=True, + text=True, + timeout=timeout, + ) + return proc.returncode, proc.stdout, proc.stderr + except subprocess.TimeoutExpired as exc: + return 124, exc.stdout or "", "timeout while querying tempo" + except Exception as exc: + return 125, "", str(exc) + +def parse_json(body): + try: + return json.loads(body) if body else None + except Exception: + return None + +def attr_value(value): + if not isinstance(value, dict): + return value + inner = value.get("value", value) + if not isinstance(inner, dict): + return inner + for key in ("stringValue", "intValue", "doubleValue", "boolValue"): + if key in inner: + return inner.get(key) + if "arrayValue" in inner: + return "" + if "kvlistValue" in inner: + return "" + if "bytesValue" in inner: + return "" + return inner + +def attrs_to_dict(attrs): + if not isinstance(attrs, list): + return {} + output = {} + for item in attrs: + if not isinstance(item, dict): + continue + key = item.get("key") + if isinstance(key, str): + output[key] = attr_value(item.get("value")) + return output + +def selected_attrs(attrs): + output = {} + for key in IMPORTANT_ATTRS: + if key in attrs: + output[key] = attrs[key] + return output + +def to_int(value): + if value is None: + return None + if isinstance(value, bool): + return 1 if value else 0 + if isinstance(value, int): + return value + if isinstance(value, float) and value.is_integer(): + return int(value) + if isinstance(value, str): + try: + return int(value.strip()) + except Exception: + return None + return None + +def to_bool(value): + if isinstance(value, bool): + return value + if isinstance(value, str): + lowered = value.strip().lower() + if lowered in ("true", "1", "yes"): + return True + if lowered in ("false", "0", "no"): + return False + return None + +def nanos_to_ms(start, end): + try: + return round((int(end) - int(start)) / 1000000, 3) + except Exception: + return None + +def span_status_code(status): + if not isinstance(status, dict): + return None + return status.get("code") + +def is_error_span(span, attrs): + status = span.get("status") if isinstance(span, dict) else {} + code = span_status_code(status) + name = str(span.get("name", "")).lower() if isinstance(span, dict) else "" + failure = str(attrs.get("failureKind", "")).strip() + terminal = str(attrs.get("terminalStatus", "")).strip() + return code in ("STATUS_CODE_ERROR", 2) or "error" in name or bool(failure) or terminal in ("failed", "blocked") + +def batches_from_body(body): + if not isinstance(body, dict): + return [] + for key in ("batches", "resourceSpans"): + value = body.get(key) + if isinstance(value, list): + return value + return [] + +def scope_spans_from_batch(batch): + if not isinstance(batch, dict): + return [] + for key in ("scopeSpans", "instrumentationLibrarySpans"): + value = batch.get(key) + if isinstance(value, list): + return value + return [] + +def extract_traces(search_body): + if not isinstance(search_body, dict): + return [] + value = search_body.get("traces") + if isinstance(value, list): + return value + value = search_body.get("data") + if isinstance(value, list): + return value + return [] + +def trace_id_from_meta(meta): + if not isinstance(meta, dict): + return None + for key in ("traceID", "traceId", "trace_id"): + value = meta.get(key) + if isinstance(value, str) and value: + return value + return None + +def compact_meta(meta): + if not isinstance(meta, dict): + return {} + output = {} + for key in ("traceID", "traceId", "rootServiceName", "rootTraceName", "startTimeUnixNano", "durationMs"): + if key in meta: + output[key] = meta[key] + return output + +def compact_span(span, service, resource_attrs, scope_name, index): + raw_attrs = attrs_to_dict(span.get("attributes")) + attrs = selected_attrs(raw_attrs) + status = span.get("status") if isinstance(span.get("status"), dict) else {} + item = { + "name": span.get("name"), + "service": service, + "scope": scope_name, + "statusCode": span_status_code(status), + "statusMessage": status.get("message"), + "durationMs": nanos_to_ms(span.get("startTimeUnixNano"), span.get("endTimeUnixNano")), + "attributes": attrs, + "_index": index, + "_start": to_int(span.get("startTimeUnixNano")) or index, + } + if "deployment.environment" in resource_attrs: + item["environment"] = resource_attrs.get("deployment.environment") + if "git.commit" in resource_attrs: + item["gitCommit"] = resource_attrs.get("git.commit") + return item, raw_attrs + +def public_span(item): + if not isinstance(item, dict): + return item + return {key: value for key, value in item.items() if not key.startswith("_")} + +def tiny_span(item): + if not isinstance(item, dict): + return item + attrs = item.get("attributes", {}) if isinstance(item.get("attributes"), dict) else {} + keep_attrs = {} + for key in ( + "http.route", "http.status_code", "http.method", "status", + "terminalStatus", "eventType", "returnedEvents", "sinceSeq", + "fromSeq", "toSeq", "totalEvents", "hasMore", "fullTraceLoaded", + "afterSeq", "rawEventCount", "failureKind", + ): + if key in attrs: + keep_attrs[key] = attrs[key] + return { + "name": item.get("name"), + "service": item.get("service"), + "attributes": keep_attrs, + } + +def tiny_terminal(terminal): + if not isinstance(terminal, dict): + return terminal + return { + "status": terminal.get("status"), + "eventType": terminal.get("eventType"), + "span": tiny_span(terminal.get("span")), + } + +def identity_from_spans(spans): + identity = {} + for key in IDENTITY_KEYS: + preferred = None + fallback = None + for item in spans: + attrs = item.get("attributes", {}) if isinstance(item.get("attributes"), dict) else {} + value = attrs.get(key) + if value in (None, ""): + continue + if fallback is None: + fallback = value + service = str(item.get("service") or "") + if service.startswith("agentrun"): + preferred = value + break + if preferred is not None or fallback is not None: + identity[key] = preferred if preferred is not None else fallback + return identity + +def flatten_trace(trace_body): + services = set() + business_trace_ids = set() + name_counts = collections.Counter() + spans = [] + error_spans = [] + raw_index = 0 + for batch in batches_from_body(trace_body): + resource = batch.get("resource") if isinstance(batch.get("resource"), dict) else {} + resource_attrs = attrs_to_dict(resource.get("attributes")) + service = resource_attrs.get("service.name") or "" + services.add(service) + for scope_span in scope_spans_from_batch(batch): + scope = scope_span.get("scope") if isinstance(scope_span.get("scope"), dict) else {} + scope_name = scope.get("name") + raw_spans = scope_span.get("spans") if isinstance(scope_span.get("spans"), list) else [] + for span in raw_spans: + if not isinstance(span, dict): + continue + item, raw_attrs = compact_span(span, service, resource_attrs, scope_name, raw_index) + raw_index += 1 + attrs = item.get("attributes", {}) if isinstance(item.get("attributes"), dict) else {} + if isinstance(attrs.get("traceId"), str): + business_trace_ids.add(attrs.get("traceId")) + name = str(item.get("name") or "") + name_counts[name] += 1 + spans.append(item) + if is_error_span(span, raw_attrs): + error_spans.append(item) + spans.sort(key=lambda item: (item.get("_start", 0), item.get("_index", 0))) + error_spans.sort(key=lambda item: (item.get("_start", 0), item.get("_index", 0))) + return { + "spans": spans, + "services": sorted(services), + "businessTraceIds": sorted(business_trace_ids), + "spanNameCounts": [{"name": name, "count": count} for name, count in name_counts.most_common(20)], + "errorSpans": error_spans, + } + +def raw_trace_shape(trace_body): + batches = batches_from_body(trace_body) + scope_count = 0 + span_count = 0 + first_resource_attrs = {} + first_scope_keys = [] + first_span_keys = [] + first_span_attr_keys = [] + for batch_index, batch in enumerate(batches): + resource = batch.get("resource") if isinstance(batch.get("resource"), dict) else {} + resource_attrs = attrs_to_dict(resource.get("attributes")) + if batch_index == 0: + for key in ("service.name", "deployment.environment", "git.commit"): + if key in resource_attrs: + first_resource_attrs[key] = resource_attrs[key] + for scope_span in scope_spans_from_batch(batch): + scope_count += 1 + if not first_scope_keys and isinstance(scope_span, dict): + first_scope_keys = sorted(scope_span.keys()) + raw_spans = scope_span.get("spans") if isinstance(scope_span.get("spans"), list) else [] + span_count += len(raw_spans) + if not first_span_keys and raw_spans and isinstance(raw_spans[0], dict): + first_span_keys = sorted(raw_spans[0].keys()) + first_span_attr_keys = sorted(attrs_to_dict(raw_spans[0].get("attributes")).keys())[:50] + return { + "mode": "bounded-raw-shape", + "note": "Full Tempo body is intentionally not printed to avoid secrets/raw transcript and SSH 60s output timeouts.", + "topLevelKeys": sorted(trace_body.keys()) if isinstance(trace_body, dict) else [], + "resourceSpanCount": len(batches), + "scopeSpanCount": scope_count, + "spanCount": span_count, + "firstResourceAttributes": first_resource_attrs, + "firstScopeKeys": first_scope_keys, + "firstSpanKeys": first_span_keys, + "firstSpanAttributeKeys": first_span_attr_keys, + } + +def http_status_summary(spans): + grouped = collections.Counter() + problem_spans = [] + for item in spans: + attrs = item.get("attributes", {}) if isinstance(item.get("attributes"), dict) else {} + code = to_int(attrs.get("http.status_code")) + if code is None: + continue + method = attrs.get("http.method") or "" + route = attrs.get("http.route") or item.get("name") or "" + grouped[(str(method), str(route), code)] += 1 + if code in (401, 403) or code >= 500: + problem_spans.append(item) + status_counts = [ + {"method": method or None, "route": route, "status": status, "count": count} + for (method, route, status), count in grouped.most_common() + ] + problem_counts = [ + {"method": method or None, "route": route, "status": status, "count": count} + for (method, route, status), count in grouped.most_common() + if status in (401, 403) or status >= 500 + ] + return { + "statusCounts": status_counts[:20], + "problemCounts": problem_counts[:20], + "problemSpans": problem_spans, + "actorForbidden": any(to_int((item.get("attributes") or {}).get("http.status_code")) == 403 and item.get("service") == "hwlab-cloud-api" for item in problem_spans), + } + +def agentrun_summary(spans): + terminal_spans = [] + latest_seq = None + seq_keys = ("maxSeq", "traceLastSeq", "endSeq", "seq", "eventSeq", "sourceSeq", "sourceLatestSeq", "latestSeq", "lastSeq") + for item in spans: + service = str(item.get("service") or "") + name = str(item.get("name") or "") + attrs = item.get("attributes", {}) if isinstance(item.get("attributes"), dict) else {} + if service.startswith("agentrun"): + for key in seq_keys: + value = to_int(attrs.get(key)) + if value is not None: + latest_seq = value if latest_seq is None else max(latest_seq, value) + terminal_status = attrs.get("terminalStatus") + if terminal_status or name.startswith("runner_terminal."): + derived = terminal_status + if not derived and name.startswith("runner_terminal."): + derived = name.split(".", 1)[1] + terminal_spans.append({ + "status": derived, + "eventType": attrs.get("eventType"), + "span": public_span(item), + }) + terminal_status = terminal_spans[-1]["status"] if terminal_spans else None + terminal_event_type = None + for terminal in reversed(terminal_spans): + if terminal.get("eventType"): + terminal_event_type = terminal.get("eventType") + break + if terminal_status in ("completed", "succeeded", "success"): + runner_classification = "completed" + elif terminal_status in ("failed", "error", "timeout", "cancelled"): + runner_classification = "failed" + elif terminal_status in ("blocked",): + runner_classification = "blocked" + elif any(str(item.get("service") or "") == "agentrun-runner" for item in spans): + runner_classification = "running" + else: + runner_classification = "unknown" + return { + "terminalStatus": terminal_status, + "latestSeq": latest_seq, + "terminalEventType": terminal_event_type, + "terminalSpans": terminal_spans, + "runnerProviderClassification": runner_classification, + } + +def hwlab_read_model_summary(spans): + trace_read_spans = [] + turn_read_spans = [] + projection_spans = [] + source_event_count = None + requested_since_seq = None + latest_projected_seq = None + turn_statuses = [] + turn_status_counts = collections.Counter() + for item in spans: + if item.get("service") != "hwlab-cloud-api": + continue + name = str(item.get("name") or "") + attrs = item.get("attributes", {}) if isinstance(item.get("attributes"), dict) else {} + if name == "trace_events_read": + trace_read_spans.append(public_span(item)) + for key in ("totalEvents", "sourceEventCount"): + value = to_int(attrs.get(key)) + if value is not None: + source_event_count = value if source_event_count is None else max(source_event_count, value) + for key in ("sinceSeq", "fromSeq"): + value = to_int(attrs.get(key)) + if value is not None: + requested_since_seq = value if requested_since_seq is None else max(requested_since_seq, value) + if name == "turn_status_read": + turn_read_spans.append(public_span(item)) + if attrs.get("turnStatus") is not None or attrs.get("status") is not None: + status_value = attrs.get("turnStatus") if attrs.get("turnStatus") is not None else attrs.get("status") + turn_statuses.append(status_value) + turn_status_counts[str(status_value)] += 1 + if name == "projection_sync": + projection_spans.append(public_span(item)) + after_seq = to_int(attrs.get("afterSeq")) + raw_count = to_int(attrs.get("rawEventCount")) + projected = None + for key in ("maxSeq", "traceLastSeq", "endSeq", "projectedSeq", "lastProjectedSeq"): + value = to_int(attrs.get(key)) + if value is not None: + projected = value if projected is None else max(projected, value) + if after_seq is not None: + projected = after_seq + (raw_count or 0) + if projected is not None: + latest_projected_seq = projected if latest_projected_seq is None else max(latest_projected_seq, projected) + source_event_count = projected if source_event_count is None else max(source_event_count, projected) + has_more_values = [] + full_loaded_values = [] + for item in trace_read_spans: + attrs = item.get("attributes", {}) if isinstance(item.get("attributes"), dict) else {} + if "hasMore" in attrs: + has_more_values.append(to_bool(attrs.get("hasMore"))) + if "fullTraceLoaded" in attrs: + full_loaded_values.append(to_bool(attrs.get("fullTraceLoaded"))) + return { + "sourceEventCount": source_event_count, + "requestedSinceSeq": requested_since_seq, + "latestProjectedSeq": latest_projected_seq, + "traceStatus": { + "readCount": len(trace_read_spans), + "anyHasMore": any(value is True for value in has_more_values), + "anyFullTraceLoaded": any(value is True for value in full_loaded_values), + }, + "turnStatus": turn_statuses[-1] if turn_statuses else None, + "turnStatusCounts": [{"status": status, "count": count} for status, count in turn_status_counts.most_common()], + "traceEventReadSpans": trace_read_spans, + "turnStatusReadSpans": turn_read_spans, + "projectionSpans": projection_spans, + } + +def projection_lag_summary(agentrun, read_model): + source_event_count = read_model.get("sourceEventCount") + requested_since_seq = read_model.get("requestedSinceSeq") + latest_projected_seq = read_model.get("latestProjectedSeq") + agent_latest_seq = agentrun.get("latestSeq") + reasons = [] + status = "none" + if requested_since_seq is not None and source_event_count is not None and requested_since_seq > source_event_count: + status = "confirmed" + reasons.append("requested sinceSeq %s is greater than HWLAB sourceEventCount %s" % (requested_since_seq, source_event_count)) + if latest_projected_seq is not None and requested_since_seq is not None and latest_projected_seq < requested_since_seq: + status = "confirmed" + reasons.append("latestProjectedSeq %s is behind requested sinceSeq %s" % (latest_projected_seq, requested_since_seq)) + if agent_latest_seq is not None and source_event_count is not None and agent_latest_seq > source_event_count: + status = "confirmed" + reasons.append("AgentRun latestSeq %s is greater than HWLAB sourceEventCount %s" % (agent_latest_seq, source_event_count)) + if status == "none" and agentrun.get("terminalStatus") == "completed" and source_event_count is not None and read_model.get("traceStatus", {}).get("readCount", 0) > 0: + status = "suspected" + reasons.append("AgentRun completed while HWLAB read model still needs endpoint/status confirmation") + return { + "status": status, + "reasons": reasons, + } + +def root_cause_candidates(http_summary, agentrun, read_model, lag_summary, error_spans): + candidates = [] + if http_summary.get("actorForbidden"): + candidates.append({ + "code": "actor_forbidden", + "label": "actor forbidden", + "confidence": 0.98, + "summary": "actor forbidden: HWLAB Code Agent read endpoint returned HTTP 403, likely actor/owner mismatch for this trace or turn.", + "evidence": http_summary.get("problemCounts", [])[:5], + }) + if lag_summary.get("status") == "confirmed": + candidates.append({ + "code": "hwlab_projection_stale", + "label": "HWLAB projection stale", + "confidence": 0.94, + "summary": "HWLAB projection stale: read model sequence is behind the caller/requested event window.", + "evidence": { + "sourceEventCount": read_model.get("sourceEventCount"), + "requestedSinceSeq": read_model.get("requestedSinceSeq"), + "latestProjectedSeq": read_model.get("latestProjectedSeq"), + "reasons": lag_summary.get("reasons", []), + }, + }) + elif lag_summary.get("status") == "suspected": + candidates.append({ + "code": "hwlab_projection_lag_suspected", + "label": "HWLAB projection stale", + "confidence": 0.64, + "summary": "HWLAB projection stale is suspected because AgentRun is terminal but HWLAB read-model evidence is incomplete.", + "evidence": lag_summary.get("reasons", []), + }) + if agentrun.get("terminalStatus") == "completed": + candidates.append({ + "code": "agentrun_completed_not_provider_blocked", + "label": "AgentRun completed", + "confidence": 0.88, + "summary": "AgentRun completed: runner/provider reached terminal completed, so this trace is not explained by an active runner hang.", + "evidence": { + "terminalStatus": agentrun.get("terminalStatus"), + "terminalEventType": agentrun.get("terminalEventType"), + "runnerProviderClassification": agentrun.get("runnerProviderClassification"), + }, + }) + if error_spans and agentrun.get("terminalStatus") == "completed": + candidates.append({ + "code": "tool_call_failures_recovered", + "label": "tool call failures recovered", + "confidence": 0.42, + "summary": "tool call failure spans exist, but AgentRun still completed; treat them as secondary unless correlated with user-visible failure.", + "evidence": { + "errorSpanCount": len(error_spans), + "sampleNames": sorted(set(str(item.get("name") or "") for item in error_spans))[:5], + }, + }) + candidates.sort(key=lambda item: item.get("confidence", 0), reverse=True) + return candidates + +def resolve_trace(): + if TRACE_ID is not None: + return TRACE_ID, { + "mode": "trace-id", + "businessTraceId": BUSINESS_TRACE_ID, + "otelTraceId": TRACE_ID, + "searchPath": None, + "searchOk": None, + "searchParseOk": None, + "candidateTraceCount": None, + "selectedMeta": None, + }, None + if SEARCH_PROXY_PATH is None: + return None, None, "missing trace id and search path" + search_rc, search_body, search_err = run_kubectl(SEARCH_PROXY_PATH, timeout=15) + search_parsed = parse_json(search_body) + metas = extract_traces(search_parsed) + selected = None + selected_trace_id = None + for meta in metas: + trace_id = trace_id_from_meta(meta) + if trace_id: + selected = meta + selected_trace_id = trace_id + break + mapping = { + "mode": "business-trace-id", + "businessTraceId": BUSINESS_TRACE_ID, + "otelTraceId": selected_trace_id, + "searchPath": SEARCH_PATH, + "searchProxyPath": SEARCH_PROXY_PATH, + "searchOk": search_rc == 0, + "searchParseOk": isinstance(search_parsed, dict), + "candidateTraceCount": len(metas), + "selectedMeta": compact_meta(selected), + "searchStderrTail": (search_err or "")[-2000:], + } + if RAW: + mapping["rawSearchBody"] = search_parsed if isinstance(search_parsed, dict) else search_body[-12000:] + if selected_trace_id is None: + return None, mapping, "no OTel trace found for business trace" + return selected_trace_id, mapping, None + +resolved_trace_id, mapping, resolve_error = resolve_trace() +if resolved_trace_id is None: + payload = { + "ok": False, + "mapping": mapping, + "error": resolve_error, + "summary": { + "rootCause": "otel trace not found", + "facts": [], + }, + "next": { + "expandWindow": "bun scripts/cli.ts platform-infra observability diagnose-code-agent --target ${target.id} --business-trace-id ${businessTraceIdForNext} --lookback-minutes 10080 --candidate-limit 100 --full", + "search": "bun scripts/cli.ts platform-infra observability search --target ${target.id} --query '{ .traceId = \\"${businessTraceIdForNext}\\" }' --lookback-minutes 10080 --candidate-limit 100 --limit 10", + }, + } + print(json.dumps(payload, ensure_ascii=False, indent=2)) + raise SystemExit(1) + +trace_path = TRACE_PATH_TEMPLATE.replace("{{traceId}}", resolved_trace_id) +trace_proxy_path = TRACE_PROXY_PREFIX + trace_path +trace_rc, trace_body, trace_err = run_kubectl(trace_proxy_path, timeout=25) +trace_parsed = parse_json(trace_body) +if not isinstance(trace_parsed, dict): + payload = { + "ok": False, + "mapping": mapping, + "tracePath": trace_path, + "traceProxyPath": trace_proxy_path, + "traceQueryOk": trace_rc == 0, + "traceParseOk": False, + "bodyBytes": len(trace_body.encode("utf-8")), + "bodyTail": trace_body[-12000:], + "stderrTail": (trace_err or "")[-4000:], + } + print(json.dumps(payload, ensure_ascii=False, indent=2)) + raise SystemExit(1) + +flat = flatten_trace(trace_parsed) +spans = flat["spans"] +services = flat["services"] +business_trace_ids = flat["businessTraceIds"] +error_spans = flat["errorSpans"] +http_summary = http_status_summary(spans) +agentrun = agentrun_summary(spans) +read_model = hwlab_read_model_summary(spans) +lag = projection_lag_summary(agentrun, read_model) +identity = identity_from_spans(spans) +candidates = root_cause_candidates(http_summary, agentrun, read_model, lag, error_spans) +expected_services = ["hwlab-cloud-api", "agentrun-manager", "agentrun-runner"] +service_path = { + service: ("reached" if service in services else "missing") + for service in expected_services +} +service_path["complete"] = all(service in services for service in expected_services) +facts = [] +if http_summary.get("actorForbidden"): + facts.append("actor forbidden") +if agentrun.get("terminalStatus") == "completed": + facts.append("AgentRun completed") +if lag.get("status") in ("confirmed", "suspected"): + facts.append("HWLAB projection stale") +if not facts: + facts.append("no dominant Code Agent root cause classified") +summary = { + "rootCause": "; ".join(facts), + "facts": facts, + "classification": { + "projectionLag": lag.get("status"), + "runnerProvider": agentrun.get("runnerProviderClassification"), + "actorForbidden": http_summary.get("actorForbidden"), + }, +} +evidence = { + "httpProblemSpanCount": len(http_summary.get("problemSpans", [])), + "httpProblemSpanSamples": [tiny_span(item) for item in http_summary.get("problemSpans", [])[:3]], + "terminalSpanCount": len(agentrun.get("terminalSpans", [])), + "terminalSpanSamples": [tiny_terminal(item) for item in agentrun.get("terminalSpans", [])[:3]], + "traceEventReadSpans": [tiny_span(item) for item in read_model.get("traceEventReadSpans", [])[:3]], + "turnStatusReadSpanCount": len(read_model.get("turnStatusReadSpans", [])), + "turnStatusReadSpanSamples": [tiny_span(item) for item in read_model.get("turnStatusReadSpans", [])[:3]], + "projectionSpanCount": len(read_model.get("projectionSpans", [])), + "projectionSpanTail": [tiny_span(item) for item in read_model.get("projectionSpans", [])[-3:]], + "errorSpanCount": len(error_spans), + "errorSpanSampleNames": sorted(set(str(item.get("name") or "") for item in error_spans))[:5], +} +payload = { + "ok": trace_rc == 0 and len(spans) > 0, + "mapping": mapping, + "tracePath": trace_path, + "traceProxyPath": trace_proxy_path, + "bodyBytes": len(trace_body.encode("utf-8")), + "traceParseOk": True, + "spanCount": len(spans), + "services": services, + "servicePath": service_path, + "businessTraceIds": business_trace_ids[:20], + "identity": identity, + "agentrun": { + "terminalStatus": agentrun.get("terminalStatus"), + "latestSeq": agentrun.get("latestSeq"), + "terminalEventType": agentrun.get("terminalEventType"), + "runnerProviderClassification": agentrun.get("runnerProviderClassification"), + }, + "hwlabReadModel": { + "sourceEventCount": read_model.get("sourceEventCount"), + "requestedSinceSeq": read_model.get("requestedSinceSeq"), + "latestProjectedSeq": read_model.get("latestProjectedSeq"), + "traceStatus": read_model.get("traceStatus"), + "turnStatus": read_model.get("turnStatus"), + "turnStatusCounts": read_model.get("turnStatusCounts"), + }, + "http": { + "statusCounts": http_summary.get("statusCounts", [])[:20], + "problemCounts": http_summary.get("problemCounts", [])[:20], + "actorForbidden": http_summary.get("actorForbidden"), + }, + "projectionLag": lag, + "summary": summary, + "rootCauseCandidates": candidates, + "spanNameCounts": flat["spanNameCounts"][:12], + "evidence": evidence, + "next": { + "diagnoseFull": "bun scripts/cli.ts platform-infra observability diagnose-code-agent --target ${target.id} --business-trace-id ${businessTraceIdForNext} --full", + "traceSummary": "bun scripts/cli.ts platform-infra observability trace --target ${target.id} --trace-id %s --limit 80" % resolved_trace_id, + "traceReads": "bun scripts/cli.ts platform-infra observability trace --target ${target.id} --trace-id %s --grep trace_events_read --limit 20 --full" % resolved_trace_id, + "turnStatus": "bun scripts/cli.ts platform-infra observability trace --target ${target.id} --trace-id %s --grep turn_status_read --limit 20 --full" % resolved_trace_id, + "projection": "bun scripts/cli.ts platform-infra observability trace --target ${target.id} --trace-id %s --grep projection_sync --limit 120 --full" % resolved_trace_id, + "runnerTerminal": "bun scripts/cli.ts platform-infra observability trace --target ${target.id} --trace-id %s --grep runner_terminal --limit 20 --full" % resolved_trace_id, + "raw": "bun scripts/cli.ts platform-infra observability diagnose-code-agent --target ${target.id} --trace-id %s --raw" % resolved_trace_id, + }, + "stderrTail": (trace_err or "")[-4000:], +} +if FULL: + payload["full"] = { + "traceEventReadSpans": read_model.get("traceEventReadSpans", []), + "turnStatusReadSpans": read_model.get("turnStatusReadSpans", []), + "projectionSpans": read_model.get("projectionSpans", [])[:LIMIT], + "terminalSpans": agentrun.get("terminalSpans", [])[:LIMIT], + "errorSpans": [public_span(item) for item in error_spans[:LIMIT]], + "httpProblemSpans": [public_span(item) for item in http_summary.get("problemSpans", [])[:LIMIT]], + "selectedSpans": [public_span(item) for item in spans if item.get("name") in ("trace_events_read", "turn_status_read", "projection_sync", "command_result")][:LIMIT], + } +if RAW: + payload["raw"] = raw_trace_shape(trace_parsed) +print(json.dumps(payload, ensure_ascii=False, indent=2 if FULL or RAW else None)) +raise SystemExit(0 if payload["ok"] else 1) +PY +`; +} + function configSummary(observability: ObservabilityConfig, target: ObservabilityTarget): Record { return { configPath: configLabel, diff --git a/scripts/src/platform-infra.ts b/scripts/src/platform-infra.ts index 354ece7d..72a22b87 100644 --- a/scripts/src/platform-infra.ts +++ b/scripts/src/platform-infra.ts @@ -304,6 +304,7 @@ export function platformInfraHelp(): unknown { "bun scripts/cli.ts platform-infra observability validate --target D601", "bun scripts/cli.ts platform-infra observability trace --target D601 --trace-id [--grep text] [--limit 40] [--full|--raw]", "bun scripts/cli.ts platform-infra observability search --target D601 --grep 'no rollout found' [--lookback-minutes 360] [--candidate-limit 80] [--limit 20]", + "bun scripts/cli.ts platform-infra observability diagnose-code-agent --target D601 --business-trace-id [--full|--raw]", ], description: "Operate YAML-controlled platform-infra services such as Sub2API, LangBot, n8n, WeChat archive workflows and OpenTelemetry tracing. Public services use PK01 Caddy+FRP rather than Kubernetes Ingress, NodePort, or LoadBalancer.", target,