feat: 支持按错误文案搜索 OTel trace

This commit is contained in:
Codex
2026-06-20 02:00:52 +00:00
parent 4928cfc9b8
commit e64c01e458
3 changed files with 416 additions and 2 deletions
+13
View File
@@ -42,6 +42,19 @@ bun scripts/cli.ts platform-infra observability trace \
默认输出不得展开完整 Tempo JSON;需要原始响应时才用 `--raw`
当只有错误文案而没有 OTel trace id 时,先用 `search` 从 Tempo 最近 trace 中反查候选,再进入 `trace`
```bash
bun scripts/cli.ts platform-infra observability search \
--target D601 \
--grep 'no rollout found' \
--lookback-minutes 360 \
--candidate-limit 80 \
--limit 20
```
`search` 会通过受控 service proxy 调 Tempo `/api/search` 取候选 trace,并逐条拉 trace 做本地 grep 摘要;默认只输出匹配 trace、服务、业务 traceId、错误 span 和下一步命令。扩大时间窗或候选数必须显式传 `--lookback-minutes` / `--candidate-limit`,避免大 trace 输出淹没上下文。
## 噪声压制
按错误文案、span 名、failureKind 或关键属性定位时,优先用 `--grep`
+402 -2
View File
@@ -130,9 +130,17 @@ interface TraceOptions extends CommonOptions {
limit: number;
}
interface SearchOptions extends CommonOptions {
grep: string | null;
query: string | null;
limit: number;
candidateLimit: number;
lookbackMinutes: number;
}
export function observabilityHelp(): Record<string, unknown> {
return {
command: "platform-infra observability plan|apply|status|validate|trace",
command: "platform-infra observability plan|apply|status|validate|trace|search",
output: "json",
configTruth: "config/platform-infra/observability.yaml",
spec: "PJ2026-01060501 OTel追踪 draft-2026-06-19-p0",
@@ -143,6 +151,7 @@ export function observabilityHelp(): Record<string, unknown> {
"bun scripts/cli.ts platform-infra observability status --target D601 [--full|--raw]",
"bun scripts/cli.ts platform-infra observability validate --target D601 [--full|--raw]",
"bun scripts/cli.ts platform-infra observability trace --target D601 --trace-id <traceId> [--grep provider-stream-disconnected] [--limit 40] [--full|--raw]",
"bun scripts/cli.ts platform-infra observability search --target D601 --grep 'no rollout found' [--lookback-minutes 360] [--candidate-limit 80] [--limit 20] [--full|--raw]",
],
boundary: "Prometheus remains the metrics source; this command owns only platform-infra OTel Collector, trace backend readiness, and trace lookup.",
};
@@ -155,6 +164,7 @@ export async function runPlatformObservabilityCommand(config: UniDeskConfig, arg
if (action === "status") return await status(config, parseCommonOptions(args.slice(1)));
if (action === "validate") return await validate(config, parseCommonOptions(args.slice(1)));
if (action === "trace") return await trace(config, parseTraceOptions(args.slice(1)));
if (action === "search") return await search(config, parseSearchOptions(args.slice(1)));
return { ok: false, error: "unsupported-platform-infra-observability-command", args, help: observabilityHelp() };
}
@@ -240,6 +250,58 @@ function parseTraceOptions(args: string[]): TraceOptions {
return { ...parseCommonOptions(commonArgs), traceId, grep, limit };
}
function parseSearchOptions(args: string[]): SearchOptions {
const commonArgs: string[] = [];
let grep: string | null = null;
let query: string | null = null;
let limit = 20;
let candidateLimit = 80;
let lookbackMinutes = 360;
for (let index = 0; index < args.length; index += 1) {
const arg = args[index];
if (arg === "--grep") {
const value = args[index + 1];
if (value === undefined || value.startsWith("--")) throw new Error("--grep requires a value");
grep = value;
index += 1;
} else if (arg === "--query" || arg === "--q") {
const value = args[index + 1];
if (value === undefined || value.startsWith("--")) throw new Error(`${arg} requires a value`);
query = value;
index += 1;
} else if (arg === "--limit") {
const value = args[index + 1];
if (value === undefined || value.startsWith("--")) throw new Error("--limit requires a value");
const parsed = Number(value);
if (!Number.isInteger(parsed) || parsed < 1 || parsed > 100) throw new Error("--limit must be an integer from 1 to 100");
limit = parsed;
index += 1;
} else if (arg === "--candidate-limit") {
const value = args[index + 1];
if (value === undefined || value.startsWith("--")) throw new Error("--candidate-limit requires a value");
const parsed = Number(value);
if (!Number.isInteger(parsed) || parsed < 1 || parsed > 500) throw new Error("--candidate-limit must be an integer from 1 to 500");
candidateLimit = parsed;
index += 1;
} else if (arg === "--lookback-minutes" || arg === "--since-minutes") {
const value = args[index + 1];
if (value === undefined || value.startsWith("--")) throw new Error(`${arg} requires a value`);
const parsed = Number(value);
if (!Number.isInteger(parsed) || parsed < 1 || parsed > 10080) throw new Error(`${arg} must be an integer from 1 to 10080`);
lookbackMinutes = parsed;
index += 1;
} else {
commonArgs.push(arg);
if (arg === "--target") {
commonArgs.push(args[index + 1] ?? "");
index += 1;
}
}
}
if (grep === null && query === null) throw new Error("observability search requires --grep <text> or --query <tempo-query>");
return { ...parseCommonOptions(commonArgs), grep, query, limit, candidateLimit, lookbackMinutes };
}
function readObservabilityConfig(): ObservabilityConfig {
const parsed = Bun.YAML.parse(readFileSync(configFile, "utf8")) as unknown;
const root = asRecord(parsed, configLabel);
@@ -516,6 +578,39 @@ async function trace(config: UniDeskConfig, options: TraceOptions): Promise<Reco
};
}
async function search(config: UniDeskConfig, options: SearchOptions): Promise<Record<string, unknown>> {
const observability = readObservabilityConfig();
const target = resolveTarget(observability, options.targetId);
const endSeconds = Math.floor(Date.now() / 1000);
const startSeconds = endSeconds - (options.lookbackMinutes * 60);
const params = new URLSearchParams({
start: String(startSeconds),
end: String(endSeconds),
limit: String(options.candidateLimit),
});
if (options.query !== null) params.set("q", options.query);
const searchPath = `/api/search?${params.toString()}`;
const result = await capture(config, target.route, ["sh"], searchScript(observability, target, searchPath, options));
const parsed = parseJsonOutput(result.stdout);
return {
ok: result.exitCode === 0 && parsed?.ok === true,
action: "platform-infra-observability-search",
mutation: false,
target: targetSummary(target),
query: {
backend: observability.traceBackend.type,
service: observability.traceBackend.serviceName,
path: searchPath,
grep: options.grep,
tempoQuery: options.query,
lookbackMinutes: options.lookbackMinutes,
candidateLimit: options.candidateLimit,
limit: options.limit,
},
result: redactSensitiveUnknown(parsed) ?? compactCapture(result, { full: true }),
};
}
function renderManifest(observability: ObservabilityConfig, target: ObservabilityTarget): string {
const collectorImage = imageReference(observability.images.collector);
const tempoImage = imageReference(observability.images.tempo);
@@ -1127,7 +1222,9 @@ IMPORTANT_ATTRS = [
"traceId", "otel.trace_id", "agentrun.stage", "runId", "commandId",
"sessionId", "turnId", "threadId", "failureKind", "willRetry",
"terminalStatus", "phase", "message", "http.route", "http.status_code",
"http.method", "eventType", "runnerId", "attemptId",
"http.method", "eventType", "runnerId", "attemptId", "storageKind",
"storagePvcName", "storagePvcPhase", "resumeMode", "rolloutId",
"error", "error.message", "exception.type", "exception.message",
]
IMPORTANT_NAMES = {
"durable_admission", "billing_preflight", "agentrun_dispatch",
@@ -1338,6 +1435,309 @@ PY
`;
}
function searchScript(observability: ObservabilityConfig, target: ObservabilityTarget, searchPath: string, options: SearchOptions): string {
const proxyPrefix = `/api/v1/namespaces/${target.namespace}/services/http:${observability.traceBackend.serviceName}:http/proxy`;
const searchProxyPath = `${proxyPrefix}${searchPath}`;
const grepLiteral = options.grep === null ? "None" : JSON.stringify(options.grep);
const queryLiteral = options.query === null ? "None" : JSON.stringify(options.query);
return `
set -u
python3 - <<'PY'
import collections, json, subprocess, time
SEARCH_PROXY_PATH = ${JSON.stringify(searchProxyPath)}
TRACE_PROXY_PREFIX = ${JSON.stringify(proxyPrefix)}
TRACE_PATH_TEMPLATE = ${JSON.stringify(observability.probes.traceQueryPathTemplate)}
FULL = ${options.full ? "True" : "False"}
RAW = ${options.raw ? "True" : "False"}
GREP = ${grepLiteral}
QUERY = ${queryLiteral}
LIMIT = ${options.limit}
CANDIDATE_LIMIT = ${options.candidateLimit}
DEADLINE = time.time() + 50
IMPORTANT_ATTRS = [
"traceId", "otel.trace_id", "agentrun.stage", "runId", "commandId",
"sessionId", "turnId", "threadId", "failureKind", "willRetry",
"terminalStatus", "phase", "message", "http.route", "http.status_code",
"http.method", "eventType", "runnerId", "attemptId", "storageKind",
"storagePvcName", "storagePvcPhase", "resumeMode", "rolloutId",
"error", "error.message", "exception.type", "exception.message",
]
def run_kubectl(proxy_path, timeout=10):
try:
proc = subprocess.run(
["kubectl", "get", "--raw", proxy_path],
capture_output=True,
text=True,
timeout=timeout,
)
return proc.returncode, proc.stdout, proc.stderr
except subprocess.TimeoutExpired as exc:
return 124, exc.stdout or "", "timeout while querying tempo"
except Exception as exc:
return 125, "", str(exc)
def parse_json(body):
try:
return json.loads(body) if body else None
except Exception:
return None
def attr_value(value):
if not isinstance(value, dict):
return value
inner = value.get("value", value)
if not isinstance(inner, dict):
return inner
for key in ("stringValue", "intValue", "doubleValue", "boolValue"):
if key in inner:
return inner.get(key)
if "arrayValue" in inner:
return "<array>"
if "kvlistValue" in inner:
return "<kvlist>"
if "bytesValue" in inner:
return "<bytes>"
return inner
def attrs_to_dict(attrs):
if not isinstance(attrs, list):
return {}
output = {}
for item in attrs:
if not isinstance(item, dict):
continue
key = item.get("key")
if isinstance(key, str):
output[key] = attr_value(item.get("value"))
return output
def selected_attrs(attrs):
output = {}
for key in IMPORTANT_ATTRS:
if key in attrs:
output[key] = attrs[key]
return output
def nanos_to_ms(start, end):
try:
return round((int(end) - int(start)) / 1000000, 3)
except Exception:
return None
def span_status_code(status):
if not isinstance(status, dict):
return None
return status.get("code")
def is_error_span(span, attrs):
status = span.get("status") if isinstance(span, dict) else {}
code = span_status_code(status)
name = str(span.get("name", "")).lower() if isinstance(span, dict) else ""
failure = str(attrs.get("failureKind", "")).strip()
terminal = str(attrs.get("terminalStatus", "")).strip()
return code in ("STATUS_CODE_ERROR", 2) or "error" in name or bool(failure) or terminal in ("failed", "blocked")
def batches_from_body(body):
if not isinstance(body, dict):
return []
for key in ("batches", "resourceSpans"):
value = body.get(key)
if isinstance(value, list):
return value
return []
def scope_spans_from_batch(batch):
if not isinstance(batch, dict):
return []
for key in ("scopeSpans", "instrumentationLibrarySpans"):
value = batch.get(key)
if isinstance(value, list):
return value
return []
def compact_span(span, service, resource_attrs, scope_name):
attrs = attrs_to_dict(span.get("attributes"))
status = span.get("status") if isinstance(span.get("status"), dict) else {}
item = {
"name": span.get("name"),
"service": service,
"scope": scope_name,
"statusCode": span_status_code(status),
"statusMessage": status.get("message"),
"durationMs": nanos_to_ms(span.get("startTimeUnixNano"), span.get("endTimeUnixNano")),
"attributes": selected_attrs(attrs),
}
if "deployment.environment" in resource_attrs:
item["environment"] = resource_attrs.get("deployment.environment")
if "git.commit" in resource_attrs:
item["gitCommit"] = resource_attrs.get("git.commit")
return item
def grep_matches_text(text):
return GREP is not None and GREP.lower() in text.lower()
def grep_matches_item(item):
return GREP is not None and grep_matches_text(json.dumps(item, ensure_ascii=False, sort_keys=True))
def extract_traces(search_body):
if not isinstance(search_body, dict):
return []
value = search_body.get("traces")
if isinstance(value, list):
return value
value = search_body.get("data")
if isinstance(value, list):
return value
return []
def trace_id_from_meta(meta):
if not isinstance(meta, dict):
return None
for key in ("traceID", "traceId", "trace_id"):
value = meta.get(key)
if isinstance(value, str) and value:
return value
return None
def compact_meta(meta):
if not isinstance(meta, dict):
return {}
output = {}
for key in ("traceID", "traceId", "rootServiceName", "rootTraceName", "startTimeUnixNano", "durationMs", "spanSet", "spanSets"):
if key in meta:
output[key] = meta[key]
return output
def trace_summary(trace_id, meta, body, rc, stderr):
parsed = parse_json(body)
raw_matched = grep_matches_text(body)
if not isinstance(parsed, dict):
return {
"traceId": trace_id,
"traceCommand": "bun scripts/cli.ts platform-infra observability trace --target ${target.id} --trace-id %s --limit 80" % trace_id,
"meta": compact_meta(meta),
"queryOk": rc == 0,
"parseOk": False,
"rawMatched": raw_matched,
"bodyBytes": len(body.encode("utf-8")),
"stderrTail": (stderr or "")[-1000:],
}
services = set()
business_trace_ids = set()
name_counts = collections.Counter()
spans = []
error_spans = []
matched_spans = []
for batch in batches_from_body(parsed):
resource = batch.get("resource") if isinstance(batch.get("resource"), dict) else {}
resource_attrs = attrs_to_dict(resource.get("attributes"))
service = resource_attrs.get("service.name") or "<unknown-service>"
services.add(service)
for scope_span in scope_spans_from_batch(batch):
scope = scope_span.get("scope") if isinstance(scope_span.get("scope"), dict) else {}
scope_name = scope.get("name")
raw_spans = scope_span.get("spans") if isinstance(scope_span.get("spans"), list) else []
for span in raw_spans:
if not isinstance(span, dict):
continue
item = compact_span(span, service, resource_attrs, scope_name)
attrs = item.get("attributes", {})
if isinstance(attrs, dict) and isinstance(attrs.get("traceId"), str):
business_trace_ids.add(attrs.get("traceId"))
name = str(item.get("name") or "")
name_counts[name] += 1
spans.append(item)
if is_error_span(span, attrs if isinstance(attrs, dict) else {}):
error_spans.append(item)
if grep_matches_item(item):
matched_spans.append(item)
return {
"traceId": trace_id,
"traceCommand": "bun scripts/cli.ts platform-infra observability trace --target ${target.id} --trace-id %s --limit 80" % trace_id,
"grepCommand": "bun scripts/cli.ts platform-infra observability trace --target ${target.id} --trace-id %s --grep %s --limit 80" % (trace_id, json.dumps(GREP) if GREP is not None else "<text>"),
"meta": compact_meta(meta),
"queryOk": rc == 0,
"parseOk": True,
"rawMatched": raw_matched,
"bodyBytes": len(body.encode("utf-8")),
"spanCount": len(spans),
"serviceCount": len(services),
"services": sorted(services),
"businessTraceIds": sorted(business_trace_ids)[:20],
"errorSpanCount": len(error_spans),
"matchedSpanCount": len(matched_spans) if GREP is not None else None,
"spanNameCounts": [{"name": name, "count": count} for name, count in name_counts.most_common(10)],
"errorSpans": error_spans[:LIMIT] if FULL else error_spans[: min(3, LIMIT)],
"matchedSpans": matched_spans[:LIMIT],
"stderrTail": (stderr or "")[-1000:],
}
search_rc, search_body, search_err = run_kubectl(SEARCH_PROXY_PATH, timeout=15)
search_parsed = parse_json(search_body)
search_parse_ok = isinstance(search_parsed, dict)
trace_metas = extract_traces(search_parsed)
candidate_trace_ids = []
seen = set()
for meta in trace_metas:
trace_id = trace_id_from_meta(meta)
if trace_id is None or trace_id in seen:
continue
seen.add(trace_id)
candidate_trace_ids.append((trace_id, meta))
if len(candidate_trace_ids) >= CANDIDATE_LIMIT:
break
matched = []
scanned = []
scan_stopped = None
for trace_id, meta in candidate_trace_ids:
if time.time() > DEADLINE:
scan_stopped = "deadline"
break
trace_path = TRACE_PATH_TEMPLATE.replace("{{traceId}}", trace_id)
trace_rc, trace_body, trace_err = run_kubectl(TRACE_PROXY_PREFIX + trace_path, timeout=8)
summary = trace_summary(trace_id, meta, trace_body, trace_rc, trace_err)
scanned.append(summary)
if GREP is None or summary.get("rawMatched") is True or (summary.get("matchedSpanCount") or 0) > 0:
matched.append(summary)
if len(matched) >= LIMIT and GREP is not None:
break
payload = {
"ok": search_rc == 0 and search_parse_ok,
"searchPath": "${searchPath}",
"searchProxyPath": SEARCH_PROXY_PATH,
"grep": GREP,
"tempoQuery": QUERY,
"limit": LIMIT,
"candidateLimit": CANDIDATE_LIMIT,
"searchParseOk": search_parse_ok,
"candidateTraceCount": len(candidate_trace_ids),
"scannedTraceCount": len(scanned),
"matchedTraceCount": len(matched),
"scanStopped": scan_stopped,
"traces": matched[:LIMIT] if GREP is not None else scanned[:LIMIT],
"truncated": {
"candidateTraces": len(trace_metas) > len(candidate_trace_ids),
"matchedTraces": len(matched) > LIMIT,
},
"next": {
"expandWindow": "bun scripts/cli.ts platform-infra observability search --target ${target.id} --grep <text> --lookback-minutes 1440 --candidate-limit 200 --limit 40",
"traceDetail": "bun scripts/cli.ts platform-infra observability trace --target ${target.id} --trace-id <traceId> --grep <text> --limit 80",
"raw": "bun scripts/cli.ts platform-infra observability search --target ${target.id} --grep <text> --raw",
},
"searchStderrTail": (search_err or "")[-2000:],
}
if RAW:
payload["rawSearchBody"] = search_parsed if search_parse_ok else search_body[-12000:]
print(json.dumps(payload, ensure_ascii=False, indent=2))
raise SystemExit(0 if payload["ok"] else 1)
PY
`;
}
function configSummary(observability: ObservabilityConfig, target: ObservabilityTarget): Record<string, unknown> {
return {
configPath: configLabel,
+1
View File
@@ -303,6 +303,7 @@ export function platformInfraHelp(): unknown {
"bun scripts/cli.ts platform-infra observability status --target D601",
"bun scripts/cli.ts platform-infra observability validate --target D601",
"bun scripts/cli.ts platform-infra observability trace --target D601 --trace-id <traceId> [--grep text] [--limit 40] [--full|--raw]",
"bun scripts/cli.ts platform-infra observability search --target D601 --grep 'no rollout found' [--lookback-minutes 360] [--candidate-limit 80] [--limit 20]",
],
description: "Operate YAML-controlled platform-infra services such as Sub2API, LangBot, n8n, WeChat archive workflows and OpenTelemetry tracing. Public services use PK01 Caddy+FRP rather than Kubernetes Ingress, NodePort, or LoadBalancer.",
target,