diff --git a/.agents/skills/unidesk-otel/SKILL.md b/.agents/skills/unidesk-otel/SKILL.md index 493a591c..b80df0cc 100644 --- a/.agents/skills/unidesk-otel/SKILL.md +++ b/.agents/skills/unidesk-otel/SKILL.md @@ -42,6 +42,19 @@ bun scripts/cli.ts platform-infra observability trace \ 默认输出不得展开完整 Tempo JSON;需要原始响应时才用 `--raw`。 +当只有错误文案而没有 OTel trace id 时,先用 `search` 从 Tempo 最近 trace 中反查候选,再进入 `trace`: + +```bash +bun scripts/cli.ts platform-infra observability search \ + --target D601 \ + --grep 'no rollout found' \ + --lookback-minutes 360 \ + --candidate-limit 80 \ + --limit 20 +``` + +`search` 会通过受控 service proxy 调 Tempo `/api/search` 取候选 trace,并逐条拉 trace 做本地 grep 摘要;默认只输出匹配 trace、服务、业务 traceId、错误 span 和下一步命令。扩大时间窗或候选数必须显式传 `--lookback-minutes` / `--candidate-limit`,避免大 trace 输出淹没上下文。 + ## 噪声压制 按错误文案、span 名、failureKind 或关键属性定位时,优先用 `--grep`: diff --git a/scripts/src/platform-infra-observability.ts b/scripts/src/platform-infra-observability.ts index 76cd3ffd..54c1455d 100644 --- a/scripts/src/platform-infra-observability.ts +++ b/scripts/src/platform-infra-observability.ts @@ -130,9 +130,17 @@ interface TraceOptions extends CommonOptions { limit: number; } +interface SearchOptions extends CommonOptions { + grep: string | null; + query: string | null; + limit: number; + candidateLimit: number; + lookbackMinutes: number; +} + export function observabilityHelp(): Record { return { - command: "platform-infra observability plan|apply|status|validate|trace", + command: "platform-infra observability plan|apply|status|validate|trace|search", output: "json", configTruth: "config/platform-infra/observability.yaml", spec: "PJ2026-01060501 OTel追踪 draft-2026-06-19-p0", @@ -143,6 +151,7 @@ export function observabilityHelp(): Record { "bun scripts/cli.ts platform-infra observability status --target D601 [--full|--raw]", "bun scripts/cli.ts platform-infra observability validate --target D601 [--full|--raw]", "bun scripts/cli.ts platform-infra observability trace --target D601 --trace-id [--grep provider-stream-disconnected] [--limit 40] [--full|--raw]", + "bun scripts/cli.ts platform-infra observability search --target D601 --grep 'no rollout found' [--lookback-minutes 360] [--candidate-limit 80] [--limit 20] [--full|--raw]", ], boundary: "Prometheus remains the metrics source; this command owns only platform-infra OTel Collector, trace backend readiness, and trace lookup.", }; @@ -155,6 +164,7 @@ export async function runPlatformObservabilityCommand(config: UniDeskConfig, arg if (action === "status") return await status(config, parseCommonOptions(args.slice(1))); if (action === "validate") return await validate(config, parseCommonOptions(args.slice(1))); if (action === "trace") return await trace(config, parseTraceOptions(args.slice(1))); + if (action === "search") return await search(config, parseSearchOptions(args.slice(1))); return { ok: false, error: "unsupported-platform-infra-observability-command", args, help: observabilityHelp() }; } @@ -240,6 +250,58 @@ function parseTraceOptions(args: string[]): TraceOptions { return { ...parseCommonOptions(commonArgs), traceId, grep, limit }; } +function parseSearchOptions(args: string[]): SearchOptions { + const commonArgs: string[] = []; + let grep: string | null = null; + let query: string | null = null; + let limit = 20; + let candidateLimit = 80; + let lookbackMinutes = 360; + for (let index = 0; index < args.length; index += 1) { + const arg = args[index]; + if (arg === "--grep") { + const value = args[index + 1]; + if (value === undefined || value.startsWith("--")) throw new Error("--grep requires a value"); + grep = value; + index += 1; + } else if (arg === "--query" || arg === "--q") { + const value = args[index + 1]; + if (value === undefined || value.startsWith("--")) throw new Error(`${arg} requires a value`); + query = value; + index += 1; + } else if (arg === "--limit") { + const value = args[index + 1]; + if (value === undefined || value.startsWith("--")) throw new Error("--limit requires a value"); + const parsed = Number(value); + if (!Number.isInteger(parsed) || parsed < 1 || parsed > 100) throw new Error("--limit must be an integer from 1 to 100"); + limit = parsed; + index += 1; + } else if (arg === "--candidate-limit") { + const value = args[index + 1]; + if (value === undefined || value.startsWith("--")) throw new Error("--candidate-limit requires a value"); + const parsed = Number(value); + if (!Number.isInteger(parsed) || parsed < 1 || parsed > 500) throw new Error("--candidate-limit must be an integer from 1 to 500"); + candidateLimit = parsed; + index += 1; + } else if (arg === "--lookback-minutes" || arg === "--since-minutes") { + const value = args[index + 1]; + if (value === undefined || value.startsWith("--")) throw new Error(`${arg} requires a value`); + const parsed = Number(value); + if (!Number.isInteger(parsed) || parsed < 1 || parsed > 10080) throw new Error(`${arg} must be an integer from 1 to 10080`); + lookbackMinutes = parsed; + index += 1; + } else { + commonArgs.push(arg); + if (arg === "--target") { + commonArgs.push(args[index + 1] ?? ""); + index += 1; + } + } + } + if (grep === null && query === null) throw new Error("observability search requires --grep or --query "); + return { ...parseCommonOptions(commonArgs), grep, query, limit, candidateLimit, lookbackMinutes }; +} + function readObservabilityConfig(): ObservabilityConfig { const parsed = Bun.YAML.parse(readFileSync(configFile, "utf8")) as unknown; const root = asRecord(parsed, configLabel); @@ -516,6 +578,39 @@ async function trace(config: UniDeskConfig, options: TraceOptions): Promise> { + const observability = readObservabilityConfig(); + const target = resolveTarget(observability, options.targetId); + const endSeconds = Math.floor(Date.now() / 1000); + const startSeconds = endSeconds - (options.lookbackMinutes * 60); + const params = new URLSearchParams({ + start: String(startSeconds), + end: String(endSeconds), + limit: String(options.candidateLimit), + }); + if (options.query !== null) params.set("q", options.query); + const searchPath = `/api/search?${params.toString()}`; + const result = await capture(config, target.route, ["sh"], searchScript(observability, target, searchPath, options)); + const parsed = parseJsonOutput(result.stdout); + return { + ok: result.exitCode === 0 && parsed?.ok === true, + action: "platform-infra-observability-search", + mutation: false, + target: targetSummary(target), + query: { + backend: observability.traceBackend.type, + service: observability.traceBackend.serviceName, + path: searchPath, + grep: options.grep, + tempoQuery: options.query, + lookbackMinutes: options.lookbackMinutes, + candidateLimit: options.candidateLimit, + limit: options.limit, + }, + result: redactSensitiveUnknown(parsed) ?? compactCapture(result, { full: true }), + }; +} + function renderManifest(observability: ObservabilityConfig, target: ObservabilityTarget): string { const collectorImage = imageReference(observability.images.collector); const tempoImage = imageReference(observability.images.tempo); @@ -1127,7 +1222,9 @@ IMPORTANT_ATTRS = [ "traceId", "otel.trace_id", "agentrun.stage", "runId", "commandId", "sessionId", "turnId", "threadId", "failureKind", "willRetry", "terminalStatus", "phase", "message", "http.route", "http.status_code", - "http.method", "eventType", "runnerId", "attemptId", + "http.method", "eventType", "runnerId", "attemptId", "storageKind", + "storagePvcName", "storagePvcPhase", "resumeMode", "rolloutId", + "error", "error.message", "exception.type", "exception.message", ] IMPORTANT_NAMES = { "durable_admission", "billing_preflight", "agentrun_dispatch", @@ -1338,6 +1435,309 @@ PY `; } +function searchScript(observability: ObservabilityConfig, target: ObservabilityTarget, searchPath: string, options: SearchOptions): string { + const proxyPrefix = `/api/v1/namespaces/${target.namespace}/services/http:${observability.traceBackend.serviceName}:http/proxy`; + const searchProxyPath = `${proxyPrefix}${searchPath}`; + const grepLiteral = options.grep === null ? "None" : JSON.stringify(options.grep); + const queryLiteral = options.query === null ? "None" : JSON.stringify(options.query); + return ` +set -u +python3 - <<'PY' +import collections, json, subprocess, time + +SEARCH_PROXY_PATH = ${JSON.stringify(searchProxyPath)} +TRACE_PROXY_PREFIX = ${JSON.stringify(proxyPrefix)} +TRACE_PATH_TEMPLATE = ${JSON.stringify(observability.probes.traceQueryPathTemplate)} +FULL = ${options.full ? "True" : "False"} +RAW = ${options.raw ? "True" : "False"} +GREP = ${grepLiteral} +QUERY = ${queryLiteral} +LIMIT = ${options.limit} +CANDIDATE_LIMIT = ${options.candidateLimit} +DEADLINE = time.time() + 50 +IMPORTANT_ATTRS = [ + "traceId", "otel.trace_id", "agentrun.stage", "runId", "commandId", + "sessionId", "turnId", "threadId", "failureKind", "willRetry", + "terminalStatus", "phase", "message", "http.route", "http.status_code", + "http.method", "eventType", "runnerId", "attemptId", "storageKind", + "storagePvcName", "storagePvcPhase", "resumeMode", "rolloutId", + "error", "error.message", "exception.type", "exception.message", +] + +def run_kubectl(proxy_path, timeout=10): + try: + proc = subprocess.run( + ["kubectl", "get", "--raw", proxy_path], + capture_output=True, + text=True, + timeout=timeout, + ) + return proc.returncode, proc.stdout, proc.stderr + except subprocess.TimeoutExpired as exc: + return 124, exc.stdout or "", "timeout while querying tempo" + except Exception as exc: + return 125, "", str(exc) + +def parse_json(body): + try: + return json.loads(body) if body else None + except Exception: + return None + +def attr_value(value): + if not isinstance(value, dict): + return value + inner = value.get("value", value) + if not isinstance(inner, dict): + return inner + for key in ("stringValue", "intValue", "doubleValue", "boolValue"): + if key in inner: + return inner.get(key) + if "arrayValue" in inner: + return "" + if "kvlistValue" in inner: + return "" + if "bytesValue" in inner: + return "" + return inner + +def attrs_to_dict(attrs): + if not isinstance(attrs, list): + return {} + output = {} + for item in attrs: + if not isinstance(item, dict): + continue + key = item.get("key") + if isinstance(key, str): + output[key] = attr_value(item.get("value")) + return output + +def selected_attrs(attrs): + output = {} + for key in IMPORTANT_ATTRS: + if key in attrs: + output[key] = attrs[key] + return output + +def nanos_to_ms(start, end): + try: + return round((int(end) - int(start)) / 1000000, 3) + except Exception: + return None + +def span_status_code(status): + if not isinstance(status, dict): + return None + return status.get("code") + +def is_error_span(span, attrs): + status = span.get("status") if isinstance(span, dict) else {} + code = span_status_code(status) + name = str(span.get("name", "")).lower() if isinstance(span, dict) else "" + failure = str(attrs.get("failureKind", "")).strip() + terminal = str(attrs.get("terminalStatus", "")).strip() + return code in ("STATUS_CODE_ERROR", 2) or "error" in name or bool(failure) or terminal in ("failed", "blocked") + +def batches_from_body(body): + if not isinstance(body, dict): + return [] + for key in ("batches", "resourceSpans"): + value = body.get(key) + if isinstance(value, list): + return value + return [] + +def scope_spans_from_batch(batch): + if not isinstance(batch, dict): + return [] + for key in ("scopeSpans", "instrumentationLibrarySpans"): + value = batch.get(key) + if isinstance(value, list): + return value + return [] + +def compact_span(span, service, resource_attrs, scope_name): + attrs = attrs_to_dict(span.get("attributes")) + status = span.get("status") if isinstance(span.get("status"), dict) else {} + item = { + "name": span.get("name"), + "service": service, + "scope": scope_name, + "statusCode": span_status_code(status), + "statusMessage": status.get("message"), + "durationMs": nanos_to_ms(span.get("startTimeUnixNano"), span.get("endTimeUnixNano")), + "attributes": selected_attrs(attrs), + } + if "deployment.environment" in resource_attrs: + item["environment"] = resource_attrs.get("deployment.environment") + if "git.commit" in resource_attrs: + item["gitCommit"] = resource_attrs.get("git.commit") + return item + +def grep_matches_text(text): + return GREP is not None and GREP.lower() in text.lower() + +def grep_matches_item(item): + return GREP is not None and grep_matches_text(json.dumps(item, ensure_ascii=False, sort_keys=True)) + +def extract_traces(search_body): + if not isinstance(search_body, dict): + return [] + value = search_body.get("traces") + if isinstance(value, list): + return value + value = search_body.get("data") + if isinstance(value, list): + return value + return [] + +def trace_id_from_meta(meta): + if not isinstance(meta, dict): + return None + for key in ("traceID", "traceId", "trace_id"): + value = meta.get(key) + if isinstance(value, str) and value: + return value + return None + +def compact_meta(meta): + if not isinstance(meta, dict): + return {} + output = {} + for key in ("traceID", "traceId", "rootServiceName", "rootTraceName", "startTimeUnixNano", "durationMs", "spanSet", "spanSets"): + if key in meta: + output[key] = meta[key] + return output + +def trace_summary(trace_id, meta, body, rc, stderr): + parsed = parse_json(body) + raw_matched = grep_matches_text(body) + if not isinstance(parsed, dict): + return { + "traceId": trace_id, + "traceCommand": "bun scripts/cli.ts platform-infra observability trace --target ${target.id} --trace-id %s --limit 80" % trace_id, + "meta": compact_meta(meta), + "queryOk": rc == 0, + "parseOk": False, + "rawMatched": raw_matched, + "bodyBytes": len(body.encode("utf-8")), + "stderrTail": (stderr or "")[-1000:], + } + services = set() + business_trace_ids = set() + name_counts = collections.Counter() + spans = [] + error_spans = [] + matched_spans = [] + for batch in batches_from_body(parsed): + resource = batch.get("resource") if isinstance(batch.get("resource"), dict) else {} + resource_attrs = attrs_to_dict(resource.get("attributes")) + service = resource_attrs.get("service.name") or "" + services.add(service) + for scope_span in scope_spans_from_batch(batch): + scope = scope_span.get("scope") if isinstance(scope_span.get("scope"), dict) else {} + scope_name = scope.get("name") + raw_spans = scope_span.get("spans") if isinstance(scope_span.get("spans"), list) else [] + for span in raw_spans: + if not isinstance(span, dict): + continue + item = compact_span(span, service, resource_attrs, scope_name) + attrs = item.get("attributes", {}) + if isinstance(attrs, dict) and isinstance(attrs.get("traceId"), str): + business_trace_ids.add(attrs.get("traceId")) + name = str(item.get("name") or "") + name_counts[name] += 1 + spans.append(item) + if is_error_span(span, attrs if isinstance(attrs, dict) else {}): + error_spans.append(item) + if grep_matches_item(item): + matched_spans.append(item) + return { + "traceId": trace_id, + "traceCommand": "bun scripts/cli.ts platform-infra observability trace --target ${target.id} --trace-id %s --limit 80" % trace_id, + "grepCommand": "bun scripts/cli.ts platform-infra observability trace --target ${target.id} --trace-id %s --grep %s --limit 80" % (trace_id, json.dumps(GREP) if GREP is not None else ""), + "meta": compact_meta(meta), + "queryOk": rc == 0, + "parseOk": True, + "rawMatched": raw_matched, + "bodyBytes": len(body.encode("utf-8")), + "spanCount": len(spans), + "serviceCount": len(services), + "services": sorted(services), + "businessTraceIds": sorted(business_trace_ids)[:20], + "errorSpanCount": len(error_spans), + "matchedSpanCount": len(matched_spans) if GREP is not None else None, + "spanNameCounts": [{"name": name, "count": count} for name, count in name_counts.most_common(10)], + "errorSpans": error_spans[:LIMIT] if FULL else error_spans[: min(3, LIMIT)], + "matchedSpans": matched_spans[:LIMIT], + "stderrTail": (stderr or "")[-1000:], + } + +search_rc, search_body, search_err = run_kubectl(SEARCH_PROXY_PATH, timeout=15) +search_parsed = parse_json(search_body) +search_parse_ok = isinstance(search_parsed, dict) +trace_metas = extract_traces(search_parsed) +candidate_trace_ids = [] +seen = set() +for meta in trace_metas: + trace_id = trace_id_from_meta(meta) + if trace_id is None or trace_id in seen: + continue + seen.add(trace_id) + candidate_trace_ids.append((trace_id, meta)) + if len(candidate_trace_ids) >= CANDIDATE_LIMIT: + break + +matched = [] +scanned = [] +scan_stopped = None +for trace_id, meta in candidate_trace_ids: + if time.time() > DEADLINE: + scan_stopped = "deadline" + break + trace_path = TRACE_PATH_TEMPLATE.replace("{{traceId}}", trace_id) + trace_rc, trace_body, trace_err = run_kubectl(TRACE_PROXY_PREFIX + trace_path, timeout=8) + summary = trace_summary(trace_id, meta, trace_body, trace_rc, trace_err) + scanned.append(summary) + if GREP is None or summary.get("rawMatched") is True or (summary.get("matchedSpanCount") or 0) > 0: + matched.append(summary) + if len(matched) >= LIMIT and GREP is not None: + break + +payload = { + "ok": search_rc == 0 and search_parse_ok, + "searchPath": "${searchPath}", + "searchProxyPath": SEARCH_PROXY_PATH, + "grep": GREP, + "tempoQuery": QUERY, + "limit": LIMIT, + "candidateLimit": CANDIDATE_LIMIT, + "searchParseOk": search_parse_ok, + "candidateTraceCount": len(candidate_trace_ids), + "scannedTraceCount": len(scanned), + "matchedTraceCount": len(matched), + "scanStopped": scan_stopped, + "traces": matched[:LIMIT] if GREP is not None else scanned[:LIMIT], + "truncated": { + "candidateTraces": len(trace_metas) > len(candidate_trace_ids), + "matchedTraces": len(matched) > LIMIT, + }, + "next": { + "expandWindow": "bun scripts/cli.ts platform-infra observability search --target ${target.id} --grep --lookback-minutes 1440 --candidate-limit 200 --limit 40", + "traceDetail": "bun scripts/cli.ts platform-infra observability trace --target ${target.id} --trace-id --grep --limit 80", + "raw": "bun scripts/cli.ts platform-infra observability search --target ${target.id} --grep --raw", + }, + "searchStderrTail": (search_err or "")[-2000:], +} +if RAW: + payload["rawSearchBody"] = search_parsed if search_parse_ok else search_body[-12000:] +print(json.dumps(payload, ensure_ascii=False, indent=2)) +raise SystemExit(0 if payload["ok"] else 1) +PY +`; +} + function configSummary(observability: ObservabilityConfig, target: ObservabilityTarget): Record { return { configPath: configLabel, diff --git a/scripts/src/platform-infra.ts b/scripts/src/platform-infra.ts index 14896573..354ece7d 100644 --- a/scripts/src/platform-infra.ts +++ b/scripts/src/platform-infra.ts @@ -303,6 +303,7 @@ export function platformInfraHelp(): unknown { "bun scripts/cli.ts platform-infra observability status --target D601", "bun scripts/cli.ts platform-infra observability validate --target D601", "bun scripts/cli.ts platform-infra observability trace --target D601 --trace-id [--grep text] [--limit 40] [--full|--raw]", + "bun scripts/cli.ts platform-infra observability search --target D601 --grep 'no rollout found' [--lookback-minutes 360] [--candidate-limit 80] [--limit 20]", ], description: "Operate YAML-controlled platform-infra services such as Sub2API, LangBot, n8n, WeChat archive workflows and OpenTelemetry tracing. Public services use PK01 Caddy+FRP rather than Kubernetes Ingress, NodePort, or LoadBalancer.", target,