feat(otel): diagnose code agent traces

This commit is contained in:
Codex
2026-06-20 11:24:48 +00:00
parent 624a1307fc
commit 8ae4861f52
3 changed files with 885 additions and 1 deletions
+22
View File
@@ -88,6 +88,28 @@ OTel trace 内常见业务关联属性:
`Code Agent 代理暂时无法连接上游``provider-stream-disconnected`、Workbench 加载/转圈、turn idle 报错、AgentRun command terminal 状态时:
优先用一条诊断命令汇总业务 trace、OTel trace、服务追穿、AgentRun 终态、HWLAB 读模型和 HTTP 403/401/5xx 根因:
```bash
bun scripts/cli.ts platform-infra observability diagnose-code-agent \
--target D601 \
--business-trace-id <trc_...>
```
默认输出必须保持有界低噪声,重点看:
- `mapping.businessTraceId` / `mapping.otelTraceId`
- `servicePath` 是否同时到达 `hwlab-cloud-api``agentrun-manager``agentrun-runner`
- `identity` 里的 `runId``commandId``sessionId``runnerJobId``runnerId``backendProfile``sourceCommit`
- `agentrun.terminalStatus``terminalEventType``runnerProviderClassification`
- `hwlabReadModel.sourceEventCount``requestedSinceSeq``turnStatusCounts`
- `http.problemCounts``projectionLag.status`
- `summary.rootCause` 与按置信度排序的 `rootCauseCandidates`
只有需要展开 span 明细时使用 `--full`;只有排查 Tempo raw 响应或 CLI 解析器时使用 `--raw`。默认输出不得包含 Secret、Authorization header、DSN、可复制凭据或完整运行 transcript。
若没有业务 traceId 或诊断结果还需要 drill-down,再使用低层 trace/search
1. 先确认 OTel backend ready
`bun scripts/cli.ts platform-infra observability status --target D601`
2. 查业务 trace 对应 OTel trace
+862 -1
View File
@@ -138,9 +138,17 @@ interface SearchOptions extends CommonOptions {
lookbackMinutes: number;
}
interface DiagnoseCodeAgentOptions extends CommonOptions {
businessTraceId: string | null;
traceId: string | null;
limit: number;
candidateLimit: number;
lookbackMinutes: number;
}
export function observabilityHelp(): Record<string, unknown> {
return {
command: "platform-infra observability plan|apply|status|validate|trace|search",
command: "platform-infra observability plan|apply|status|validate|trace|search|diagnose-code-agent",
output: "json",
configTruth: "config/platform-infra/observability.yaml",
spec: "PJ2026-01060501 OTel追踪 draft-2026-06-19-p0",
@@ -152,6 +160,7 @@ export function observabilityHelp(): Record<string, unknown> {
"bun scripts/cli.ts platform-infra observability validate --target D601 [--full|--raw]",
"bun scripts/cli.ts platform-infra observability trace --target D601 --trace-id <traceId> [--grep provider-stream-disconnected] [--limit 40] [--full|--raw]",
"bun scripts/cli.ts platform-infra observability search --target D601 --grep 'no rollout found' [--lookback-minutes 360] [--candidate-limit 80] [--limit 20] [--full|--raw]",
"bun scripts/cli.ts platform-infra observability diagnose-code-agent --target D601 --business-trace-id <trc_...> [--full|--raw]",
],
boundary: "Prometheus remains the metrics source; this command owns only platform-infra OTel Collector, trace backend readiness, and trace lookup.",
};
@@ -165,6 +174,7 @@ export async function runPlatformObservabilityCommand(config: UniDeskConfig, arg
if (action === "validate") return await validate(config, parseCommonOptions(args.slice(1)));
if (action === "trace") return await trace(config, parseTraceOptions(args.slice(1)));
if (action === "search") return await search(config, parseSearchOptions(args.slice(1)));
if (action === "diagnose-code-agent") return await diagnoseCodeAgent(config, parseDiagnoseCodeAgentOptions(args.slice(1)));
return { ok: false, error: "unsupported-platform-infra-observability-command", args, help: observabilityHelp() };
}
@@ -302,6 +312,60 @@ function parseSearchOptions(args: string[]): SearchOptions {
return { ...parseCommonOptions(commonArgs), grep, query, limit, candidateLimit, lookbackMinutes };
}
function parseDiagnoseCodeAgentOptions(args: string[]): DiagnoseCodeAgentOptions {
const commonArgs: string[] = [];
let businessTraceId: string | null = null;
let traceId: string | null = null;
let limit = 40;
let candidateLimit = 20;
let lookbackMinutes = 10080;
for (let index = 0; index < args.length; index += 1) {
const arg = args[index];
if (arg === "--business-trace-id" || arg === "--trace") {
const value = args[index + 1];
if (value === undefined || value.startsWith("--")) throw new Error(`${arg} requires a value`);
if (!/^trc_[A-Za-z0-9_-]+$/u.test(value)) throw new Error(`${arg} must look like trc_<id>`);
businessTraceId = value;
index += 1;
} else if (arg === "--trace-id" || arg === "--otel-trace-id") {
const value = args[index + 1];
if (value === undefined || value.startsWith("--")) throw new Error(`${arg} requires a value`);
if (!/^[0-9a-fA-F]{32}$/u.test(value)) throw new Error(`${arg} must be a 32-character OpenTelemetry trace id encoded as hex`);
traceId = value.toLowerCase();
index += 1;
} else if (arg === "--limit") {
const value = args[index + 1];
if (value === undefined || value.startsWith("--")) throw new Error("--limit requires a value");
const parsed = Number(value);
if (!Number.isInteger(parsed) || parsed < 1 || parsed > 200) throw new Error("--limit must be an integer from 1 to 200");
limit = parsed;
index += 1;
} else if (arg === "--candidate-limit") {
const value = args[index + 1];
if (value === undefined || value.startsWith("--")) throw new Error("--candidate-limit requires a value");
const parsed = Number(value);
if (!Number.isInteger(parsed) || parsed < 1 || parsed > 100) throw new Error("--candidate-limit must be an integer from 1 to 100");
candidateLimit = parsed;
index += 1;
} else if (arg === "--lookback-minutes" || arg === "--since-minutes") {
const value = args[index + 1];
if (value === undefined || value.startsWith("--")) throw new Error(`${arg} requires a value`);
const parsed = Number(value);
if (!Number.isInteger(parsed) || parsed < 1 || parsed > 10080) throw new Error(`${arg} must be an integer from 1 to 10080`);
lookbackMinutes = parsed;
index += 1;
} else {
commonArgs.push(arg);
if (arg === "--target") {
commonArgs.push(args[index + 1] ?? "");
index += 1;
}
}
}
if (businessTraceId === null && traceId === null) throw new Error("observability diagnose-code-agent requires --business-trace-id <trc_...> or --trace-id <otelTraceId>");
return { ...parseCommonOptions(commonArgs), businessTraceId, traceId, limit, candidateLimit, lookbackMinutes };
}
function readObservabilityConfig(): ObservabilityConfig {
const parsed = Bun.YAML.parse(readFileSync(configFile, "utf8")) as unknown;
const root = asRecord(parsed, configLabel);
@@ -611,6 +675,42 @@ async function search(config: UniDeskConfig, options: SearchOptions): Promise<Re
};
}
async function diagnoseCodeAgent(config: UniDeskConfig, options: DiagnoseCodeAgentOptions): Promise<Record<string, unknown>> {
const observability = readObservabilityConfig();
const target = resolveTarget(observability, options.targetId);
const endSeconds = Math.floor(Date.now() / 1000);
const startSeconds = endSeconds - (options.lookbackMinutes * 60);
let searchPath: string | null = null;
if (options.traceId === null && options.businessTraceId !== null) {
const params = new URLSearchParams({
start: String(startSeconds),
end: String(endSeconds),
limit: String(options.candidateLimit),
q: `{ .traceId = "${options.businessTraceId}" }`,
});
searchPath = `/api/search?${params.toString()}`;
}
const result = await capture(config, target.route, ["sh"], diagnoseCodeAgentScript(observability, target, searchPath, options));
const parsed = parseJsonOutput(result.stdout);
return {
ok: result.exitCode === 0 && parsed?.ok === true,
action: "platform-infra-observability-diagnose-code-agent",
mutation: false,
target: targetSummary(target),
query: {
backend: observability.traceBackend.type,
service: observability.traceBackend.serviceName,
businessTraceId: options.businessTraceId,
traceId: options.traceId,
path: searchPath,
lookbackMinutes: options.lookbackMinutes,
candidateLimit: options.candidateLimit,
limit: options.limit,
},
result: redactSensitiveUnknown(parsed) ?? compactCapture(result, { full: true }),
};
}
function renderManifest(observability: ObservabilityConfig, target: ObservabilityTarget): string {
const collectorImage = imageReference(observability.images.collector);
const tempoImage = imageReference(observability.images.tempo);
@@ -1748,6 +1848,767 @@ PY
`;
}
function diagnoseCodeAgentScript(observability: ObservabilityConfig, target: ObservabilityTarget, searchPath: string | null, options: DiagnoseCodeAgentOptions): string {
const proxyPrefix = `/api/v1/namespaces/${target.namespace}/services/http:${observability.traceBackend.serviceName}:http/proxy`;
const searchProxyPath = searchPath === null ? null : `${proxyPrefix}${searchPath}`;
const traceIdLiteral = options.traceId === null ? "None" : JSON.stringify(options.traceId);
const businessTraceIdLiteral = options.businessTraceId === null ? "None" : JSON.stringify(options.businessTraceId);
const searchPathLiteral = searchPath === null ? "None" : JSON.stringify(searchPath);
const searchProxyPathLiteral = searchProxyPath === null ? "None" : JSON.stringify(searchProxyPath);
const businessTraceIdForNext = options.businessTraceId ?? "<trc_...>";
return `
set -u
python3 - <<'PY'
import collections, json, subprocess, time
BUSINESS_TRACE_ID = ${businessTraceIdLiteral}
TRACE_ID = ${traceIdLiteral}
SEARCH_PATH = ${searchPathLiteral}
SEARCH_PROXY_PATH = ${searchProxyPathLiteral}
TRACE_PROXY_PREFIX = ${JSON.stringify(proxyPrefix)}
TRACE_PATH_TEMPLATE = ${JSON.stringify(observability.probes.traceQueryPathTemplate)}
FULL = ${options.full ? "True" : "False"}
RAW = ${options.raw ? "True" : "False"}
LIMIT = ${options.limit}
DEADLINE = time.time() + 50
IMPORTANT_ATTRS = [
"traceId", "otel.trace_id", "agentrun.stage", "runId", "commandId",
"sessionId", "turnId", "threadId", "runnerJobId", "failureKind", "willRetry",
"terminalStatus", "phase", "message", "http.route", "http.status_code",
"http.method", "eventType", "runnerId", "attemptId", "backendProfile",
"sourceCommit", "jobName", "podName", "logPath", "storageKind",
"storagePvcName", "storagePvcPhase", "resumeMode", "rolloutId",
"returnedEvents", "sinceSeq", "afterSeq", "limit", "fromSeq", "toSeq",
"totalEvents", "hasMore", "fullTraceLoaded", "rawEventCount",
"maxSeq", "traceLastSeq", "endSeq", "commandFiltered",
"seq", "eventSeq", "sourceSeq", "sourceLatestSeq", "latestSeq",
"lastSeq", "sourceEventCount", "projectedSeq", "lastProjectedSeq",
"status", "turnStatus", "error", "error.message", "exception.type",
"exception.message", "valuesPrinted",
]
IDENTITY_KEYS = [
"runId", "commandId", "sessionId", "turnId", "threadId",
"runnerJobId", "runnerId", "attemptId", "backendProfile",
"sourceCommit", "jobName", "podName",
]
def run_kubectl(proxy_path, timeout=15):
try:
proc = subprocess.run(
["kubectl", "get", "--raw", proxy_path],
capture_output=True,
text=True,
timeout=timeout,
)
return proc.returncode, proc.stdout, proc.stderr
except subprocess.TimeoutExpired as exc:
return 124, exc.stdout or "", "timeout while querying tempo"
except Exception as exc:
return 125, "", str(exc)
def parse_json(body):
try:
return json.loads(body) if body else None
except Exception:
return None
def attr_value(value):
if not isinstance(value, dict):
return value
inner = value.get("value", value)
if not isinstance(inner, dict):
return inner
for key in ("stringValue", "intValue", "doubleValue", "boolValue"):
if key in inner:
return inner.get(key)
if "arrayValue" in inner:
return "<array>"
if "kvlistValue" in inner:
return "<kvlist>"
if "bytesValue" in inner:
return "<bytes>"
return inner
def attrs_to_dict(attrs):
if not isinstance(attrs, list):
return {}
output = {}
for item in attrs:
if not isinstance(item, dict):
continue
key = item.get("key")
if isinstance(key, str):
output[key] = attr_value(item.get("value"))
return output
def selected_attrs(attrs):
output = {}
for key in IMPORTANT_ATTRS:
if key in attrs:
output[key] = attrs[key]
return output
def to_int(value):
if value is None:
return None
if isinstance(value, bool):
return 1 if value else 0
if isinstance(value, int):
return value
if isinstance(value, float) and value.is_integer():
return int(value)
if isinstance(value, str):
try:
return int(value.strip())
except Exception:
return None
return None
def to_bool(value):
if isinstance(value, bool):
return value
if isinstance(value, str):
lowered = value.strip().lower()
if lowered in ("true", "1", "yes"):
return True
if lowered in ("false", "0", "no"):
return False
return None
def nanos_to_ms(start, end):
try:
return round((int(end) - int(start)) / 1000000, 3)
except Exception:
return None
def span_status_code(status):
if not isinstance(status, dict):
return None
return status.get("code")
def is_error_span(span, attrs):
status = span.get("status") if isinstance(span, dict) else {}
code = span_status_code(status)
name = str(span.get("name", "")).lower() if isinstance(span, dict) else ""
failure = str(attrs.get("failureKind", "")).strip()
terminal = str(attrs.get("terminalStatus", "")).strip()
return code in ("STATUS_CODE_ERROR", 2) or "error" in name or bool(failure) or terminal in ("failed", "blocked")
def batches_from_body(body):
if not isinstance(body, dict):
return []
for key in ("batches", "resourceSpans"):
value = body.get(key)
if isinstance(value, list):
return value
return []
def scope_spans_from_batch(batch):
if not isinstance(batch, dict):
return []
for key in ("scopeSpans", "instrumentationLibrarySpans"):
value = batch.get(key)
if isinstance(value, list):
return value
return []
def extract_traces(search_body):
if not isinstance(search_body, dict):
return []
value = search_body.get("traces")
if isinstance(value, list):
return value
value = search_body.get("data")
if isinstance(value, list):
return value
return []
def trace_id_from_meta(meta):
if not isinstance(meta, dict):
return None
for key in ("traceID", "traceId", "trace_id"):
value = meta.get(key)
if isinstance(value, str) and value:
return value
return None
def compact_meta(meta):
if not isinstance(meta, dict):
return {}
output = {}
for key in ("traceID", "traceId", "rootServiceName", "rootTraceName", "startTimeUnixNano", "durationMs"):
if key in meta:
output[key] = meta[key]
return output
def compact_span(span, service, resource_attrs, scope_name, index):
raw_attrs = attrs_to_dict(span.get("attributes"))
attrs = selected_attrs(raw_attrs)
status = span.get("status") if isinstance(span.get("status"), dict) else {}
item = {
"name": span.get("name"),
"service": service,
"scope": scope_name,
"statusCode": span_status_code(status),
"statusMessage": status.get("message"),
"durationMs": nanos_to_ms(span.get("startTimeUnixNano"), span.get("endTimeUnixNano")),
"attributes": attrs,
"_index": index,
"_start": to_int(span.get("startTimeUnixNano")) or index,
}
if "deployment.environment" in resource_attrs:
item["environment"] = resource_attrs.get("deployment.environment")
if "git.commit" in resource_attrs:
item["gitCommit"] = resource_attrs.get("git.commit")
return item, raw_attrs
def public_span(item):
if not isinstance(item, dict):
return item
return {key: value for key, value in item.items() if not key.startswith("_")}
def tiny_span(item):
if not isinstance(item, dict):
return item
attrs = item.get("attributes", {}) if isinstance(item.get("attributes"), dict) else {}
keep_attrs = {}
for key in (
"http.route", "http.status_code", "http.method", "status",
"terminalStatus", "eventType", "returnedEvents", "sinceSeq",
"fromSeq", "toSeq", "totalEvents", "hasMore", "fullTraceLoaded",
"afterSeq", "rawEventCount", "failureKind",
):
if key in attrs:
keep_attrs[key] = attrs[key]
return {
"name": item.get("name"),
"service": item.get("service"),
"attributes": keep_attrs,
}
def tiny_terminal(terminal):
if not isinstance(terminal, dict):
return terminal
return {
"status": terminal.get("status"),
"eventType": terminal.get("eventType"),
"span": tiny_span(terminal.get("span")),
}
def identity_from_spans(spans):
identity = {}
for key in IDENTITY_KEYS:
preferred = None
fallback = None
for item in spans:
attrs = item.get("attributes", {}) if isinstance(item.get("attributes"), dict) else {}
value = attrs.get(key)
if value in (None, ""):
continue
if fallback is None:
fallback = value
service = str(item.get("service") or "")
if service.startswith("agentrun"):
preferred = value
break
if preferred is not None or fallback is not None:
identity[key] = preferred if preferred is not None else fallback
return identity
def flatten_trace(trace_body):
services = set()
business_trace_ids = set()
name_counts = collections.Counter()
spans = []
error_spans = []
raw_index = 0
for batch in batches_from_body(trace_body):
resource = batch.get("resource") if isinstance(batch.get("resource"), dict) else {}
resource_attrs = attrs_to_dict(resource.get("attributes"))
service = resource_attrs.get("service.name") or "<unknown-service>"
services.add(service)
for scope_span in scope_spans_from_batch(batch):
scope = scope_span.get("scope") if isinstance(scope_span.get("scope"), dict) else {}
scope_name = scope.get("name")
raw_spans = scope_span.get("spans") if isinstance(scope_span.get("spans"), list) else []
for span in raw_spans:
if not isinstance(span, dict):
continue
item, raw_attrs = compact_span(span, service, resource_attrs, scope_name, raw_index)
raw_index += 1
attrs = item.get("attributes", {}) if isinstance(item.get("attributes"), dict) else {}
if isinstance(attrs.get("traceId"), str):
business_trace_ids.add(attrs.get("traceId"))
name = str(item.get("name") or "")
name_counts[name] += 1
spans.append(item)
if is_error_span(span, raw_attrs):
error_spans.append(item)
spans.sort(key=lambda item: (item.get("_start", 0), item.get("_index", 0)))
error_spans.sort(key=lambda item: (item.get("_start", 0), item.get("_index", 0)))
return {
"spans": spans,
"services": sorted(services),
"businessTraceIds": sorted(business_trace_ids),
"spanNameCounts": [{"name": name, "count": count} for name, count in name_counts.most_common(20)],
"errorSpans": error_spans,
}
def raw_trace_shape(trace_body):
batches = batches_from_body(trace_body)
scope_count = 0
span_count = 0
first_resource_attrs = {}
first_scope_keys = []
first_span_keys = []
first_span_attr_keys = []
for batch_index, batch in enumerate(batches):
resource = batch.get("resource") if isinstance(batch.get("resource"), dict) else {}
resource_attrs = attrs_to_dict(resource.get("attributes"))
if batch_index == 0:
for key in ("service.name", "deployment.environment", "git.commit"):
if key in resource_attrs:
first_resource_attrs[key] = resource_attrs[key]
for scope_span in scope_spans_from_batch(batch):
scope_count += 1
if not first_scope_keys and isinstance(scope_span, dict):
first_scope_keys = sorted(scope_span.keys())
raw_spans = scope_span.get("spans") if isinstance(scope_span.get("spans"), list) else []
span_count += len(raw_spans)
if not first_span_keys and raw_spans and isinstance(raw_spans[0], dict):
first_span_keys = sorted(raw_spans[0].keys())
first_span_attr_keys = sorted(attrs_to_dict(raw_spans[0].get("attributes")).keys())[:50]
return {
"mode": "bounded-raw-shape",
"note": "Full Tempo body is intentionally not printed to avoid secrets/raw transcript and SSH 60s output timeouts.",
"topLevelKeys": sorted(trace_body.keys()) if isinstance(trace_body, dict) else [],
"resourceSpanCount": len(batches),
"scopeSpanCount": scope_count,
"spanCount": span_count,
"firstResourceAttributes": first_resource_attrs,
"firstScopeKeys": first_scope_keys,
"firstSpanKeys": first_span_keys,
"firstSpanAttributeKeys": first_span_attr_keys,
}
def http_status_summary(spans):
grouped = collections.Counter()
problem_spans = []
for item in spans:
attrs = item.get("attributes", {}) if isinstance(item.get("attributes"), dict) else {}
code = to_int(attrs.get("http.status_code"))
if code is None:
continue
method = attrs.get("http.method") or ""
route = attrs.get("http.route") or item.get("name") or "<unknown-route>"
grouped[(str(method), str(route), code)] += 1
if code in (401, 403) or code >= 500:
problem_spans.append(item)
status_counts = [
{"method": method or None, "route": route, "status": status, "count": count}
for (method, route, status), count in grouped.most_common()
]
problem_counts = [
{"method": method or None, "route": route, "status": status, "count": count}
for (method, route, status), count in grouped.most_common()
if status in (401, 403) or status >= 500
]
return {
"statusCounts": status_counts[:20],
"problemCounts": problem_counts[:20],
"problemSpans": problem_spans,
"actorForbidden": any(to_int((item.get("attributes") or {}).get("http.status_code")) == 403 and item.get("service") == "hwlab-cloud-api" for item in problem_spans),
}
def agentrun_summary(spans):
terminal_spans = []
latest_seq = None
seq_keys = ("maxSeq", "traceLastSeq", "endSeq", "seq", "eventSeq", "sourceSeq", "sourceLatestSeq", "latestSeq", "lastSeq")
for item in spans:
service = str(item.get("service") or "")
name = str(item.get("name") or "")
attrs = item.get("attributes", {}) if isinstance(item.get("attributes"), dict) else {}
if service.startswith("agentrun"):
for key in seq_keys:
value = to_int(attrs.get(key))
if value is not None:
latest_seq = value if latest_seq is None else max(latest_seq, value)
terminal_status = attrs.get("terminalStatus")
if terminal_status or name.startswith("runner_terminal."):
derived = terminal_status
if not derived and name.startswith("runner_terminal."):
derived = name.split(".", 1)[1]
terminal_spans.append({
"status": derived,
"eventType": attrs.get("eventType"),
"span": public_span(item),
})
terminal_status = terminal_spans[-1]["status"] if terminal_spans else None
terminal_event_type = None
for terminal in reversed(terminal_spans):
if terminal.get("eventType"):
terminal_event_type = terminal.get("eventType")
break
if terminal_status in ("completed", "succeeded", "success"):
runner_classification = "completed"
elif terminal_status in ("failed", "error", "timeout", "cancelled"):
runner_classification = "failed"
elif terminal_status in ("blocked",):
runner_classification = "blocked"
elif any(str(item.get("service") or "") == "agentrun-runner" for item in spans):
runner_classification = "running"
else:
runner_classification = "unknown"
return {
"terminalStatus": terminal_status,
"latestSeq": latest_seq,
"terminalEventType": terminal_event_type,
"terminalSpans": terminal_spans,
"runnerProviderClassification": runner_classification,
}
def hwlab_read_model_summary(spans):
trace_read_spans = []
turn_read_spans = []
projection_spans = []
source_event_count = None
requested_since_seq = None
latest_projected_seq = None
turn_statuses = []
turn_status_counts = collections.Counter()
for item in spans:
if item.get("service") != "hwlab-cloud-api":
continue
name = str(item.get("name") or "")
attrs = item.get("attributes", {}) if isinstance(item.get("attributes"), dict) else {}
if name == "trace_events_read":
trace_read_spans.append(public_span(item))
for key in ("totalEvents", "sourceEventCount"):
value = to_int(attrs.get(key))
if value is not None:
source_event_count = value if source_event_count is None else max(source_event_count, value)
for key in ("sinceSeq", "fromSeq"):
value = to_int(attrs.get(key))
if value is not None:
requested_since_seq = value if requested_since_seq is None else max(requested_since_seq, value)
if name == "turn_status_read":
turn_read_spans.append(public_span(item))
if attrs.get("turnStatus") is not None or attrs.get("status") is not None:
status_value = attrs.get("turnStatus") if attrs.get("turnStatus") is not None else attrs.get("status")
turn_statuses.append(status_value)
turn_status_counts[str(status_value)] += 1
if name == "projection_sync":
projection_spans.append(public_span(item))
after_seq = to_int(attrs.get("afterSeq"))
raw_count = to_int(attrs.get("rawEventCount"))
projected = None
for key in ("maxSeq", "traceLastSeq", "endSeq", "projectedSeq", "lastProjectedSeq"):
value = to_int(attrs.get(key))
if value is not None:
projected = value if projected is None else max(projected, value)
if after_seq is not None:
projected = after_seq + (raw_count or 0)
if projected is not None:
latest_projected_seq = projected if latest_projected_seq is None else max(latest_projected_seq, projected)
source_event_count = projected if source_event_count is None else max(source_event_count, projected)
has_more_values = []
full_loaded_values = []
for item in trace_read_spans:
attrs = item.get("attributes", {}) if isinstance(item.get("attributes"), dict) else {}
if "hasMore" in attrs:
has_more_values.append(to_bool(attrs.get("hasMore")))
if "fullTraceLoaded" in attrs:
full_loaded_values.append(to_bool(attrs.get("fullTraceLoaded")))
return {
"sourceEventCount": source_event_count,
"requestedSinceSeq": requested_since_seq,
"latestProjectedSeq": latest_projected_seq,
"traceStatus": {
"readCount": len(trace_read_spans),
"anyHasMore": any(value is True for value in has_more_values),
"anyFullTraceLoaded": any(value is True for value in full_loaded_values),
},
"turnStatus": turn_statuses[-1] if turn_statuses else None,
"turnStatusCounts": [{"status": status, "count": count} for status, count in turn_status_counts.most_common()],
"traceEventReadSpans": trace_read_spans,
"turnStatusReadSpans": turn_read_spans,
"projectionSpans": projection_spans,
}
def projection_lag_summary(agentrun, read_model):
source_event_count = read_model.get("sourceEventCount")
requested_since_seq = read_model.get("requestedSinceSeq")
latest_projected_seq = read_model.get("latestProjectedSeq")
agent_latest_seq = agentrun.get("latestSeq")
reasons = []
status = "none"
if requested_since_seq is not None and source_event_count is not None and requested_since_seq > source_event_count:
status = "confirmed"
reasons.append("requested sinceSeq %s is greater than HWLAB sourceEventCount %s" % (requested_since_seq, source_event_count))
if latest_projected_seq is not None and requested_since_seq is not None and latest_projected_seq < requested_since_seq:
status = "confirmed"
reasons.append("latestProjectedSeq %s is behind requested sinceSeq %s" % (latest_projected_seq, requested_since_seq))
if agent_latest_seq is not None and source_event_count is not None and agent_latest_seq > source_event_count:
status = "confirmed"
reasons.append("AgentRun latestSeq %s is greater than HWLAB sourceEventCount %s" % (agent_latest_seq, source_event_count))
if status == "none" and agentrun.get("terminalStatus") == "completed" and source_event_count is not None and read_model.get("traceStatus", {}).get("readCount", 0) > 0:
status = "suspected"
reasons.append("AgentRun completed while HWLAB read model still needs endpoint/status confirmation")
return {
"status": status,
"reasons": reasons,
}
def root_cause_candidates(http_summary, agentrun, read_model, lag_summary, error_spans):
candidates = []
if http_summary.get("actorForbidden"):
candidates.append({
"code": "actor_forbidden",
"label": "actor forbidden",
"confidence": 0.98,
"summary": "actor forbidden: HWLAB Code Agent read endpoint returned HTTP 403, likely actor/owner mismatch for this trace or turn.",
"evidence": http_summary.get("problemCounts", [])[:5],
})
if lag_summary.get("status") == "confirmed":
candidates.append({
"code": "hwlab_projection_stale",
"label": "HWLAB projection stale",
"confidence": 0.94,
"summary": "HWLAB projection stale: read model sequence is behind the caller/requested event window.",
"evidence": {
"sourceEventCount": read_model.get("sourceEventCount"),
"requestedSinceSeq": read_model.get("requestedSinceSeq"),
"latestProjectedSeq": read_model.get("latestProjectedSeq"),
"reasons": lag_summary.get("reasons", []),
},
})
elif lag_summary.get("status") == "suspected":
candidates.append({
"code": "hwlab_projection_lag_suspected",
"label": "HWLAB projection stale",
"confidence": 0.64,
"summary": "HWLAB projection stale is suspected because AgentRun is terminal but HWLAB read-model evidence is incomplete.",
"evidence": lag_summary.get("reasons", []),
})
if agentrun.get("terminalStatus") == "completed":
candidates.append({
"code": "agentrun_completed_not_provider_blocked",
"label": "AgentRun completed",
"confidence": 0.88,
"summary": "AgentRun completed: runner/provider reached terminal completed, so this trace is not explained by an active runner hang.",
"evidence": {
"terminalStatus": agentrun.get("terminalStatus"),
"terminalEventType": agentrun.get("terminalEventType"),
"runnerProviderClassification": agentrun.get("runnerProviderClassification"),
},
})
if error_spans and agentrun.get("terminalStatus") == "completed":
candidates.append({
"code": "tool_call_failures_recovered",
"label": "tool call failures recovered",
"confidence": 0.42,
"summary": "tool call failure spans exist, but AgentRun still completed; treat them as secondary unless correlated with user-visible failure.",
"evidence": {
"errorSpanCount": len(error_spans),
"sampleNames": sorted(set(str(item.get("name") or "") for item in error_spans))[:5],
},
})
candidates.sort(key=lambda item: item.get("confidence", 0), reverse=True)
return candidates
def resolve_trace():
if TRACE_ID is not None:
return TRACE_ID, {
"mode": "trace-id",
"businessTraceId": BUSINESS_TRACE_ID,
"otelTraceId": TRACE_ID,
"searchPath": None,
"searchOk": None,
"searchParseOk": None,
"candidateTraceCount": None,
"selectedMeta": None,
}, None
if SEARCH_PROXY_PATH is None:
return None, None, "missing trace id and search path"
search_rc, search_body, search_err = run_kubectl(SEARCH_PROXY_PATH, timeout=15)
search_parsed = parse_json(search_body)
metas = extract_traces(search_parsed)
selected = None
selected_trace_id = None
for meta in metas:
trace_id = trace_id_from_meta(meta)
if trace_id:
selected = meta
selected_trace_id = trace_id
break
mapping = {
"mode": "business-trace-id",
"businessTraceId": BUSINESS_TRACE_ID,
"otelTraceId": selected_trace_id,
"searchPath": SEARCH_PATH,
"searchProxyPath": SEARCH_PROXY_PATH,
"searchOk": search_rc == 0,
"searchParseOk": isinstance(search_parsed, dict),
"candidateTraceCount": len(metas),
"selectedMeta": compact_meta(selected),
"searchStderrTail": (search_err or "")[-2000:],
}
if RAW:
mapping["rawSearchBody"] = search_parsed if isinstance(search_parsed, dict) else search_body[-12000:]
if selected_trace_id is None:
return None, mapping, "no OTel trace found for business trace"
return selected_trace_id, mapping, None
resolved_trace_id, mapping, resolve_error = resolve_trace()
if resolved_trace_id is None:
payload = {
"ok": False,
"mapping": mapping,
"error": resolve_error,
"summary": {
"rootCause": "otel trace not found",
"facts": [],
},
"next": {
"expandWindow": "bun scripts/cli.ts platform-infra observability diagnose-code-agent --target ${target.id} --business-trace-id ${businessTraceIdForNext} --lookback-minutes 10080 --candidate-limit 100 --full",
"search": "bun scripts/cli.ts platform-infra observability search --target ${target.id} --query '{ .traceId = \\"${businessTraceIdForNext}\\" }' --lookback-minutes 10080 --candidate-limit 100 --limit 10",
},
}
print(json.dumps(payload, ensure_ascii=False, indent=2))
raise SystemExit(1)
trace_path = TRACE_PATH_TEMPLATE.replace("{{traceId}}", resolved_trace_id)
trace_proxy_path = TRACE_PROXY_PREFIX + trace_path
trace_rc, trace_body, trace_err = run_kubectl(trace_proxy_path, timeout=25)
trace_parsed = parse_json(trace_body)
if not isinstance(trace_parsed, dict):
payload = {
"ok": False,
"mapping": mapping,
"tracePath": trace_path,
"traceProxyPath": trace_proxy_path,
"traceQueryOk": trace_rc == 0,
"traceParseOk": False,
"bodyBytes": len(trace_body.encode("utf-8")),
"bodyTail": trace_body[-12000:],
"stderrTail": (trace_err or "")[-4000:],
}
print(json.dumps(payload, ensure_ascii=False, indent=2))
raise SystemExit(1)
flat = flatten_trace(trace_parsed)
spans = flat["spans"]
services = flat["services"]
business_trace_ids = flat["businessTraceIds"]
error_spans = flat["errorSpans"]
http_summary = http_status_summary(spans)
agentrun = agentrun_summary(spans)
read_model = hwlab_read_model_summary(spans)
lag = projection_lag_summary(agentrun, read_model)
identity = identity_from_spans(spans)
candidates = root_cause_candidates(http_summary, agentrun, read_model, lag, error_spans)
expected_services = ["hwlab-cloud-api", "agentrun-manager", "agentrun-runner"]
service_path = {
service: ("reached" if service in services else "missing")
for service in expected_services
}
service_path["complete"] = all(service in services for service in expected_services)
facts = []
if http_summary.get("actorForbidden"):
facts.append("actor forbidden")
if agentrun.get("terminalStatus") == "completed":
facts.append("AgentRun completed")
if lag.get("status") in ("confirmed", "suspected"):
facts.append("HWLAB projection stale")
if not facts:
facts.append("no dominant Code Agent root cause classified")
summary = {
"rootCause": "; ".join(facts),
"facts": facts,
"classification": {
"projectionLag": lag.get("status"),
"runnerProvider": agentrun.get("runnerProviderClassification"),
"actorForbidden": http_summary.get("actorForbidden"),
},
}
evidence = {
"httpProblemSpanCount": len(http_summary.get("problemSpans", [])),
"httpProblemSpanSamples": [tiny_span(item) for item in http_summary.get("problemSpans", [])[:3]],
"terminalSpanCount": len(agentrun.get("terminalSpans", [])),
"terminalSpanSamples": [tiny_terminal(item) for item in agentrun.get("terminalSpans", [])[:3]],
"traceEventReadSpans": [tiny_span(item) for item in read_model.get("traceEventReadSpans", [])[:3]],
"turnStatusReadSpanCount": len(read_model.get("turnStatusReadSpans", [])),
"turnStatusReadSpanSamples": [tiny_span(item) for item in read_model.get("turnStatusReadSpans", [])[:3]],
"projectionSpanCount": len(read_model.get("projectionSpans", [])),
"projectionSpanTail": [tiny_span(item) for item in read_model.get("projectionSpans", [])[-3:]],
"errorSpanCount": len(error_spans),
"errorSpanSampleNames": sorted(set(str(item.get("name") or "") for item in error_spans))[:5],
}
payload = {
"ok": trace_rc == 0 and len(spans) > 0,
"mapping": mapping,
"tracePath": trace_path,
"traceProxyPath": trace_proxy_path,
"bodyBytes": len(trace_body.encode("utf-8")),
"traceParseOk": True,
"spanCount": len(spans),
"services": services,
"servicePath": service_path,
"businessTraceIds": business_trace_ids[:20],
"identity": identity,
"agentrun": {
"terminalStatus": agentrun.get("terminalStatus"),
"latestSeq": agentrun.get("latestSeq"),
"terminalEventType": agentrun.get("terminalEventType"),
"runnerProviderClassification": agentrun.get("runnerProviderClassification"),
},
"hwlabReadModel": {
"sourceEventCount": read_model.get("sourceEventCount"),
"requestedSinceSeq": read_model.get("requestedSinceSeq"),
"latestProjectedSeq": read_model.get("latestProjectedSeq"),
"traceStatus": read_model.get("traceStatus"),
"turnStatus": read_model.get("turnStatus"),
"turnStatusCounts": read_model.get("turnStatusCounts"),
},
"http": {
"statusCounts": http_summary.get("statusCounts", [])[:20],
"problemCounts": http_summary.get("problemCounts", [])[:20],
"actorForbidden": http_summary.get("actorForbidden"),
},
"projectionLag": lag,
"summary": summary,
"rootCauseCandidates": candidates,
"spanNameCounts": flat["spanNameCounts"][:12],
"evidence": evidence,
"next": {
"diagnoseFull": "bun scripts/cli.ts platform-infra observability diagnose-code-agent --target ${target.id} --business-trace-id ${businessTraceIdForNext} --full",
"traceSummary": "bun scripts/cli.ts platform-infra observability trace --target ${target.id} --trace-id %s --limit 80" % resolved_trace_id,
"traceReads": "bun scripts/cli.ts platform-infra observability trace --target ${target.id} --trace-id %s --grep trace_events_read --limit 20 --full" % resolved_trace_id,
"turnStatus": "bun scripts/cli.ts platform-infra observability trace --target ${target.id} --trace-id %s --grep turn_status_read --limit 20 --full" % resolved_trace_id,
"projection": "bun scripts/cli.ts platform-infra observability trace --target ${target.id} --trace-id %s --grep projection_sync --limit 120 --full" % resolved_trace_id,
"runnerTerminal": "bun scripts/cli.ts platform-infra observability trace --target ${target.id} --trace-id %s --grep runner_terminal --limit 20 --full" % resolved_trace_id,
"raw": "bun scripts/cli.ts platform-infra observability diagnose-code-agent --target ${target.id} --trace-id %s --raw" % resolved_trace_id,
},
"stderrTail": (trace_err or "")[-4000:],
}
if FULL:
payload["full"] = {
"traceEventReadSpans": read_model.get("traceEventReadSpans", []),
"turnStatusReadSpans": read_model.get("turnStatusReadSpans", []),
"projectionSpans": read_model.get("projectionSpans", [])[:LIMIT],
"terminalSpans": agentrun.get("terminalSpans", [])[:LIMIT],
"errorSpans": [public_span(item) for item in error_spans[:LIMIT]],
"httpProblemSpans": [public_span(item) for item in http_summary.get("problemSpans", [])[:LIMIT]],
"selectedSpans": [public_span(item) for item in spans if item.get("name") in ("trace_events_read", "turn_status_read", "projection_sync", "command_result")][:LIMIT],
}
if RAW:
payload["raw"] = raw_trace_shape(trace_parsed)
print(json.dumps(payload, ensure_ascii=False, indent=2 if FULL or RAW else None))
raise SystemExit(0 if payload["ok"] else 1)
PY
`;
}
function configSummary(observability: ObservabilityConfig, target: ObservabilityTarget): Record<string, unknown> {
return {
configPath: configLabel,
+1
View File
@@ -304,6 +304,7 @@ export function platformInfraHelp(): unknown {
"bun scripts/cli.ts platform-infra observability validate --target D601",
"bun scripts/cli.ts platform-infra observability trace --target D601 --trace-id <traceId> [--grep text] [--limit 40] [--full|--raw]",
"bun scripts/cli.ts platform-infra observability search --target D601 --grep 'no rollout found' [--lookback-minutes 360] [--candidate-limit 80] [--limit 20]",
"bun scripts/cli.ts platform-infra observability diagnose-code-agent --target D601 --business-trace-id <trc_...> [--full|--raw]",
],
description: "Operate YAML-controlled platform-infra services such as Sub2API, LangBot, n8n, WeChat archive workflows and OpenTelemetry tracing. Public services use PK01 Caddy+FRP rather than Kubernetes Ingress, NodePort, or LoadBalancer.",
target,