diff --git a/config/platform-infra/sub2api-codex-pool.yaml b/config/platform-infra/sub2api-codex-pool.yaml index 8dc79644..44198419 100644 --- a/config/platform-infra/sub2api-codex-pool.yaml +++ b/config/platform-infra/sub2api-codex-pool.yaml @@ -179,6 +179,14 @@ sentinel: maxOutputTokens: 16 transportRetryMinutes: 5 userAgent: Go-http-client/1.1 + gatewayFailureMonitor: + enabled: true + lookbackSeconds: 900 + tailLines: 4000 + freezeTtlMinutes: 10 + paths: + - /responses + - /v1/responses cadence: successInitialIntervalMinutes: 1 successMaxIntervalMinutes: 20 diff --git a/scripts/src/platform-infra-sub2api-codex-sentinel.ts b/scripts/src/platform-infra-sub2api-codex-sentinel.ts index fa5de950..19be7ae1 100644 --- a/scripts/src/platform-infra-sub2api-codex-sentinel.ts +++ b/scripts/src/platform-infra-sub2api-codex-sentinel.ts @@ -26,6 +26,13 @@ export interface CodexPoolSentinelConfig { transportRetryMinutes: number; userAgent: string; }; + gatewayFailureMonitor: { + enabled: boolean; + lookbackSeconds: number; + tailLines: number; + freezeTtlMinutes: number; + paths: string[]; + }; sdk: { openaiPythonVersion: string; }; @@ -100,6 +107,13 @@ export function defaultCodexPoolSentinelConfig(): CodexPoolSentinelConfig { transportRetryMinutes: 5, userAgent: "Go-http-client/1.1", }, + gatewayFailureMonitor: { + enabled: false, + lookbackSeconds: 900, + tailLines: 4000, + freezeTtlMinutes: 10, + paths: ["/responses", "/v1/responses"], + }, sdk: { openaiPythonVersion: "2.41.1", }, @@ -146,6 +160,7 @@ export function readCodexPoolSentinelConfig(value: unknown, defaults: CodexPoolS const actions = isRecord(value.actions) ? value.actions : {}; const marker = isRecord(value.marker) ? value.marker : {}; const probe = isRecord(value.probe) ? value.probe : {}; + const gatewayFailureMonitor = isRecord(value.gatewayFailureMonitor) ? value.gatewayFailureMonitor : {}; const sdk = isRecord(value.sdk) ? value.sdk : {}; const cadence = isRecord(value.cadence) ? value.cadence : {}; const freeze = isRecord(value.freeze) ? value.freeze : {}; @@ -179,6 +194,13 @@ export function readCodexPoolSentinelConfig(value: unknown, defaults: CodexPoolS transportRetryMinutes: readInt(valueAt(probe, "transportRetryMinutes"), `${sourcePath}.sentinel.probe.transportRetryMinutes`, defaults.probe.transportRetryMinutes, 1, 120), userAgent: readUserAgent(valueAt(probe, "userAgent"), `${sourcePath}.sentinel.probe.userAgent`, defaults.probe.userAgent), }, + gatewayFailureMonitor: { + enabled: readBoolean(valueAt(gatewayFailureMonitor, "enabled"), `${sourcePath}.sentinel.gatewayFailureMonitor.enabled`, defaults.gatewayFailureMonitor.enabled), + lookbackSeconds: readInt(valueAt(gatewayFailureMonitor, "lookbackSeconds"), `${sourcePath}.sentinel.gatewayFailureMonitor.lookbackSeconds`, defaults.gatewayFailureMonitor.lookbackSeconds, 60, 7200), + tailLines: readInt(valueAt(gatewayFailureMonitor, "tailLines"), `${sourcePath}.sentinel.gatewayFailureMonitor.tailLines`, defaults.gatewayFailureMonitor.tailLines, 100, 50000), + freezeTtlMinutes: readInt(valueAt(gatewayFailureMonitor, "freezeTtlMinutes"), `${sourcePath}.sentinel.gatewayFailureMonitor.freezeTtlMinutes`, defaults.gatewayFailureMonitor.freezeTtlMinutes, 1, 1440), + paths: readPathList(valueAt(gatewayFailureMonitor, "paths"), `${sourcePath}.sentinel.gatewayFailureMonitor.paths`, defaults.gatewayFailureMonitor.paths), + }, sdk: { openaiPythonVersion: readOpenAiPythonVersion(valueAt(sdk, "openaiPythonVersion"), `${sourcePath}.sentinel.sdk.openaiPythonVersion`, defaults.sdk.openaiPythonVersion), }, @@ -239,6 +261,7 @@ export function codexPoolSentinelSummary(config: CodexPoolSentinelConfig): Recor model: config.model, endpoint: config.endpoint, probe: config.probe, + gatewayFailureMonitor: config.gatewayFailureMonitor, sdk: config.sdk, cadence: config.cadence, freeze: config.freeze, @@ -271,6 +294,7 @@ export function renderCodexPoolSentinelManifest( endpoint: config.endpoint, marker: config.marker, probe: config.probe, + gatewayFailureMonitor: config.gatewayFailureMonitor, sdk: config.sdk, cadence: config.cadence, freeze: config.freeze, @@ -336,6 +360,12 @@ rules: - apiGroups: [""] resources: ["configmaps"] verbs: ["get", "create", "update", "patch"] + - apiGroups: [""] + resources: ["pods"] + verbs: ["get", "list"] + - apiGroups: [""] + resources: ["pods/log"] + verbs: ["get"] --- apiVersion: rbac.authorization.k8s.io/v1 kind: RoleBinding @@ -527,7 +557,13 @@ class KubeClient: try: with request.urlopen(req, timeout=15, context=self.context) as resp: raw = resp.read() - return resp.status, json.loads(raw.decode("utf-8")) if raw else None + if not raw: + return resp.status, None + text = raw.decode("utf-8", errors="replace") + try: + return resp.status, json.loads(text) + except Exception: + return resp.status, text except error.HTTPError as exc: raw = exc.read() try: @@ -572,6 +608,24 @@ class KubeClient: raise RuntimeError(f"update state configmap {name} failed: {status} {data}") return data + def list_pods(self, label_selector): + query = parse.urlencode({"labelSelector": label_selector}) + status, data = self.api("GET", f"/api/v1/namespaces/{self.namespace}/pods?{query}") + if status >= 300: + raise RuntimeError(f"list pods failed: {status} {data}") + return data.get("items") if isinstance(data, dict) and isinstance(data.get("items"), list) else [] + + def pod_logs(self, pod_name, container, since_seconds, tail_lines): + query = parse.urlencode({ + "container": container, + "sinceSeconds": int(since_seconds), + "tailLines": int(tail_lines), + }) + status, data = self.api("GET", f"/api/v1/namespaces/{self.namespace}/pods/{url_quote(pod_name)}/log?{query}") + if status >= 300: + raise RuntimeError(f"get pod logs failed: {status} {data}") + return data if isinstance(data, str) else "" + def default_state(): return { "version": 1, @@ -1246,6 +1300,242 @@ def apply_result(result, state, config, now, admin, profile): } return action +def log_line_payload(line): + pos = line.find("{") + if pos < 0: + return None, "", None + prefix = line[:pos].rstrip("\t ") + parts = prefix.split("\t") + ts = parts[0] if parts else "" + message = parts[3] if len(parts) >= 4 else "" + try: + return ts, message, json.loads(line[pos:]) + except Exception: + return ts, message, None + +def gateway_monitor_paths(config): + cfg = config.get("gatewayFailureMonitor") if isinstance(config.get("gatewayFailureMonitor"), dict) else {} + paths = cfg.get("paths") + if isinstance(paths, list) and paths: + return set(str(item) for item in paths if isinstance(item, str) and item) + return {"/responses", "/v1/responses"} + +def is_gateway_stream_failure(message, payload, config): + if "openai.forward_failed" not in message or not isinstance(payload, dict): + return False + path = payload.get("path") + if path not in gateway_monitor_paths(config): + return False + if payload.get("account_id") is None: + return False + error_text = str(payload.get("error") or "").lower() + fallback_written = payload.get("fallback_error_response_written") is True + upstream_already_written = payload.get("upstream_error_response_already_written") is True + stream_failure = any(token in error_text for token in ( + "stream usage incomplete", + "missing terminal event", + "stream read error", + "stream data interval timeout", + )) + return fallback_written or upstream_already_written or stream_failure + +def gateway_failure_item(ts, pod_name, payload): + request_id = payload.get("request_id") or sha(json.dumps(payload, sort_keys=True, ensure_ascii=False)) + try: + account_id = int(payload.get("account_id")) + except Exception: + account_id = None + return { + "at": ts, + "pod": pod_name, + "requestId": request_id, + "clientRequestId": payload.get("client_request_id"), + "accountId": account_id, + "path": payload.get("path"), + "errorPreview": preview(payload.get("error"), 240), + "fallbackErrorResponseWritten": payload.get("fallback_error_response_written") is True, + "upstreamErrorResponseAlreadyWritten": payload.get("upstream_error_response_already_written") is True, + "bodyBytes": payload.get("body_bytes"), + "latencyMs": payload.get("latency_ms"), + "statusCode": payload.get("status_code"), + "upstreamStatus": payload.get("upstream_status"), + } + +def trim_gateway_seen(monitor_state, now, lookback_seconds): + seen = monitor_state.setdefault("seenRequestIds", {}) + if not isinstance(seen, dict): + seen = {} + monitor_state["seenRequestIds"] = seen + cutoff = now - timedelta(seconds=max(int(lookback_seconds) * 4, 3600)) + for request_id, seen_at in list(seen.items()): + parsed = parse_iso(seen_at) + if parsed is None or parsed < cutoff: + seen.pop(request_id, None) + return seen + +def gateway_failure_account_map(admin): + by_id = {} + for name, account in admin.accounts().items(): + try: + account_id = int(account.get("id")) + except Exception: + continue + by_id[account_id] = name + return by_id + +def apply_gateway_failure(account_name, failures, state, config, now, admin, profile): + latest = failures[-1] + account_state = state.setdefault("accounts", {}).setdefault(account_name, {}) + monitor_cfg = config.get("gatewayFailureMonitor") if isinstance(config.get("gatewayFailureMonitor"), dict) else {} + interval = int(monitor_cfg.get("freezeTtlMinutes") or config["freeze"]["maxTtlMinutes"]) + until = add_minutes(now, interval, int(config["freeze"]["jitterPercent"])) + actions_enabled = bool(config["actions"]["enabled"]) + applied = False + action = {"taken": False, "type": "would-freeze"} + if actions_enabled: + try: + action = {"taken": True, "type": "freeze", "result": admin.set_schedulable(account_name, False)} + applied = True + except Exception as exc: + action = {"taken": False, "type": "freeze-failed", "error": str(exc)} + account_state["quarantine"] = { + "active": True, + "applied": applied, + "until": iso(until), + "intervalMinutes": interval, + "reason": "gateway-forward-failure", + "failureKind": "gateway-stream-forward-failure", + "errorDetails": { + "kind": "Sub2APIGatewayForwardFailure", + "requestId": latest.get("requestId"), + "clientRequestId": latest.get("clientRequestId"), + "path": latest.get("path"), + "errorPreview": latest.get("errorPreview"), + "fallbackErrorResponseWritten": latest.get("fallbackErrorResponseWritten"), + "upstreamErrorResponseAlreadyWritten": latest.get("upstreamErrorResponseAlreadyWritten"), + "bodyBytes": latest.get("bodyBytes"), + "latencyMs": latest.get("latencyMs"), + "countInRun": len(failures), + }, + "lastBadAt": iso(now), + } + account_state["nextProbeAfter"] = iso(until) + account_state["successStreak"] = 0 + account_state["successIntervalMinutes"] = 0 + account_state["successMaxIntervalMinutes"] = success_max_interval(profile, config) + account_state["lastStatus"] = "quarantined" + account_state["lastFailureAt"] = iso(now) + account_state["lastGatewayFailureAt"] = iso(now) + account_state["lastGatewayFailure"] = { + "accountName": account_name, + "accountId": latest.get("accountId"), + "requestId": latest.get("requestId"), + "clientRequestId": latest.get("clientRequestId"), + "path": latest.get("path"), + "errorPreview": latest.get("errorPreview"), + "fallbackErrorResponseWritten": latest.get("fallbackErrorResponseWritten"), + "upstreamErrorResponseAlreadyWritten": latest.get("upstreamErrorResponseAlreadyWritten"), + "bodyBytes": latest.get("bodyBytes"), + "latencyMs": latest.get("latencyMs"), + "countInRun": len(failures), + "firstAt": failures[0].get("at"), + "lastAt": latest.get("at"), + "freezeUntil": iso(until), + "action": action, + } + return action + +def run_gateway_failure_monitor(state, config, now, kube, admin, profiles): + cfg = config.get("gatewayFailureMonitor") if isinstance(config.get("gatewayFailureMonitor"), dict) else {} + if cfg.get("enabled") is not True: + return {"enabled": False, "scanned": 0, "newFailures": 0, "actions": []} + lookback_seconds = int(cfg.get("lookbackSeconds") or 900) + tail_lines = int(cfg.get("tailLines") or 4000) + monitor_state = state.setdefault("gatewayFailureMonitor", {}) + if not isinstance(monitor_state, dict): + monitor_state = {} + state["gatewayFailureMonitor"] = monitor_state + seen = trim_gateway_seen(monitor_state, now, lookback_seconds) + pods = kube.list_pods("app.kubernetes.io/name=sub2api") + candidates = [] + log_errors = [] + for pod in pods: + metadata = pod.get("metadata") if isinstance(pod, dict) else {} + status = pod.get("status") if isinstance(pod, dict) else {} + pod_name = metadata.get("name") + if not isinstance(pod_name, str) or not pod_name: + continue + if status.get("phase") not in (None, "Running"): + continue + try: + logs = kube.pod_logs(pod_name, "sub2api", lookback_seconds, tail_lines) + except Exception as exc: + log_errors.append({"pod": pod_name, "error": str(exc)}) + continue + for line in str(logs).splitlines(): + ts, message, payload = log_line_payload(line) + if is_gateway_stream_failure(message, payload, config): + candidates.append(gateway_failure_item(ts, pod_name, payload)) + by_id = gateway_failure_account_map(admin) if candidates else {} + profile_by_name = {item.get("accountName"): item for item in profiles if isinstance(item, dict) and isinstance(item.get("accountName"), str)} + by_account = {} + skipped = [] + new_failures = [] + for item in candidates: + request_id = item.get("requestId") + if not isinstance(request_id, str) or not request_id: + request_id = sha(json.dumps(item, sort_keys=True, ensure_ascii=False)) + item["requestId"] = request_id + if request_id in seen: + continue + seen[request_id] = iso(now) + account_name = by_id.get(item.get("accountId")) + if not account_name: + skipped.append({"requestId": request_id, "accountId": item.get("accountId"), "reason": "account-id-not-managed"}) + continue + if account_name not in profile_by_name: + skipped.append({"requestId": request_id, "accountId": item.get("accountId"), "accountName": account_name, "reason": "account-not-in-profiles"}) + continue + item["accountName"] = account_name + by_account.setdefault(account_name, []).append(item) + new_failures.append(item) + actions = [] + for account_name, failures in sorted(by_account.items()): + failures.sort(key=lambda item: item.get("at") or "") + profile = profile_by_name[account_name] + action = apply_gateway_failure(account_name, failures, state, config, now, admin, profile) + actions.append({ + "accountName": account_name, + "accountId": failures[-1].get("accountId"), + "failureCount": len(failures), + "requestId": failures[-1].get("requestId"), + "path": failures[-1].get("path"), + "errorPreview": failures[-1].get("errorPreview"), + "taken": action.get("taken"), + "type": action.get("type"), + "error": action.get("error"), + }) + monitor_state["lastRunAt"] = iso(now) + monitor_state["lastScannedPods"] = [((pod.get("metadata") or {}).get("name")) for pod in pods if isinstance(pod, dict)] + monitor_state["lastCandidateCount"] = len(candidates) + monitor_state["lastNewFailureCount"] = len(new_failures) + monitor_state["lastActionCount"] = len(actions) + monitor_state["lastFailures"] = new_failures[-20:] + monitor_state["lastSkipped"] = skipped[-20:] + monitor_state["lastLogErrors"] = log_errors[-10:] + return { + "enabled": True, + "lookbackSeconds": lookback_seconds, + "tailLines": tail_lines, + "scannedPods": len(pods), + "candidates": len(candidates), + "newFailures": len(new_failures), + "actionsTaken": sum(1 for item in actions if item.get("taken") is True), + "actions": actions[-20:], + "skipped": skipped[-20:], + "logErrors": log_errors[-10:], + } + def reconcile_active_quarantines(state, config, now, admin): actions = [] if not config["actions"]["enabled"]: @@ -1283,6 +1573,7 @@ def main(): admin = Sub2ApiAdmin(config) reconcile = reconcile_active_quarantines(state, config, now, admin) forced_names = forced_account_names() + gateway_monitor = {"enabled": False, "skipped": "forced-manual-probe"} if forced_names else run_gateway_failure_monitor(state, config, now, kube, admin, profiles) if forced_names: due, selection = choose_forced_profiles(profiles, state, config, now, forced_names) else: @@ -1309,6 +1600,13 @@ def main(): "markerMismatchCount": sum(1 for item in results if item.get("markerMatched") is not True), "transportFailureCount": sum(1 for item in results if item.get("transportFailure") is True), "actionsTaken": sum(1 for item in actions if item.get("taken") is True), + "gatewayFailureMonitor": { + "enabled": gateway_monitor.get("enabled") is True, + "newFailures": gateway_monitor.get("newFailures", 0), + "actionsTaken": gateway_monitor.get("actionsTaken", 0), + "skipped": gateway_monitor.get("skipped"), + "logErrors": gateway_monitor.get("logErrors"), + }, "selection": selection, "reconcile": reconcile[-20:], } @@ -1339,6 +1637,7 @@ def main(): "requestShape": item.get("requestShape"), } for item in results], "actions": actions, + "gatewayFailureMonitor": gateway_monitor, "valuesPrinted": False, }, ensure_ascii=False)) @@ -1405,6 +1704,18 @@ function readUserAgent(value: unknown, key: string, fallback: string): string { return text; } +function readPathList(value: unknown, key: string, fallback: string[]): string[] { + if (value === undefined || value === null) return fallback; + if (!Array.isArray(value) || value.length === 0) throw new Error(`${key} must be a non-empty string array`); + const paths = value.map((item, index) => { + if (typeof item !== "string" || item.trim().length === 0) throw new Error(`${key}[${index}] must be a non-empty string`); + const path = item.trim(); + if (!/^\/[A-Za-z0-9._~!$&'()*+,;=:@/-]*$/u.test(path)) throw new Error(`${key}[${index}] has an unsupported HTTP path`); + return path; + }); + return [...new Set(paths)]; +} + function readOpenAiPythonVersion(value: unknown, key: string, fallback: string): string { const text = readString(value, key, fallback); if (!/^[0-9]+[.][0-9]+[.][0-9]+$/u.test(text)) throw new Error(`${key} must be a pinned semver version like 2.41.1`); diff --git a/scripts/src/platform-infra-sub2api-codex.ts b/scripts/src/platform-infra-sub2api-codex.ts index d0531433..f67288ba 100644 --- a/scripts/src/platform-infra-sub2api-codex.ts +++ b/scripts/src/platform-infra-sub2api-codex.ts @@ -1869,6 +1869,7 @@ function compactSentinelProbeResult(parsed: Record | null): Rec markerMismatchCount: summary.markerMismatchCount, transportFailureCount: summary.transportFailureCount, actionsTaken: summary.actionsTaken, + gatewayFailureMonitor: summary.gatewayFailureMonitor, selection: summary.selection, }, results: recordArray(probe.results).map((item) => pickSummaryFields(item, [ @@ -1955,7 +1956,7 @@ function renderSentinelReport( textValue(account.successIntervalMin), textValue(account.successMaxIntervalMin), textValue(account.probeCount), - shortIso(account.lastProbeAt), + shortIso(account.lastEventAt ?? account.lastProbeAt), textValue(account.lastHttp), account.lastMarker === true ? "Y" : account.lastMarker === false ? "N" : "-", shorten(stringValue(account.lastFailureKind) ?? "-", 20), @@ -1968,7 +1969,7 @@ function renderSentinelReport( lines.push(""); lines.push(`RUNS last=${Math.min(context.events, runs.length)}`); lines.push(renderTable([ - ["AT", "SEL", "DUE", "OK", "BAD", "TF", "ACT", "REASSERT"], + ["AT", "SEL", "DUE", "OK", "BAD", "TF", "ACT", "GF", "GACT", "REASSERT"], ...runs.slice(-context.events).map((run) => [ shortIso(run.at), textValue(run.selected), @@ -1977,12 +1978,14 @@ function renderSentinelReport( textValue(run.mismatch), textValue(run.transportFailures), textValue(run.actionsTaken), + textValue(run.gatewayFailures), + textValue(run.gatewayActions), textValue(run.reasserts), ]), ])); } lines.push(""); - lines.push("LEGEND Q=quarantined T=trusted upstream M=marker matched F_MIN=freeze interval S_MIN=success interval S_MAX=success max interval OBS_MIN=last probe to next probe minutes TF=transport failures"); + lines.push("LEGEND Q=quarantined T=trusted upstream M=marker matched F_MIN=freeze interval S_MIN=success interval S_MAX=success max interval OBS_MIN=last event to next probe minutes TF=transport failures GF=gateway failures GACT=gateway freeze actions"); lines.push("Raw: bun scripts/cli.ts platform-infra sub2api codex-pool sentinel-report --raw"); return lines.join("\n"); } @@ -2973,6 +2976,12 @@ def report(): continue probe = account_state.get("lastProbe") if isinstance(account_state.get("lastProbe"), dict) else {} quarantine = account_state.get("quarantine") if isinstance(account_state.get("quarantine"), dict) else {} + gateway_failure = account_state.get("lastGatewayFailure") if isinstance(account_state.get("lastGatewayFailure"), dict) else {} + last_event_at = account_state.get("lastGatewayFailureAt") or account_state.get("lastProbeAt") + last_failure_kind = quarantine.get("failureKind") if quarantine.get("active") is True and quarantine.get("failureKind") else probe.get("failureKind") + last_action = action_type(probe) + if gateway_failure and (not account_state.get("lastProbeAt") or str(account_state.get("lastGatewayFailureAt") or "") >= str(account_state.get("lastProbeAt") or "")): + last_action = action_type(gateway_failure) ledger = account_ledger(account_state) account_rows.append({ "account": name, @@ -2991,14 +3000,16 @@ def report(): "totalTokens": ledger.get("totalTokens", 0), "estimatedCostUsd": round(float(ledger.get("estimatedCostUsd", 0)), 6), "lastProbeAt": account_state.get("lastProbeAt"), + "lastEventAt": last_event_at, + "lastGatewayFailureAt": account_state.get("lastGatewayFailureAt"), "lastPurpose": probe.get("purpose"), "lastHttp": probe.get("httpStatus"), "lastMarker": probe.get("markerMatched"), - "lastFailureKind": probe.get("failureKind"), + "lastFailureKind": last_failure_kind, "lastErrorCode": error_code(probe), - "lastAction": action_type(probe), + "lastAction": last_action, "nextProbeAfter": account_state.get("nextProbeAfter"), - "observedLastToNextMin": minutes_between(account_state.get("lastProbeAt"), account_state.get("nextProbeAfter")), + "observedLastToNextMin": minutes_between(last_event_at, account_state.get("nextProbeAfter")), "requestShape": probe.get("requestShape"), }) run_rows = [] @@ -3006,6 +3017,7 @@ def report(): if not isinstance(item, dict): continue selection = item.get("selection") if isinstance(item.get("selection"), dict) else {} + gateway = item.get("gatewayFailureMonitor") if isinstance(item.get("gatewayFailureMonitor"), dict) else {} run_rows.append({ "at": item.get("at"), "selected": item.get("selected"), @@ -3014,6 +3026,8 @@ def report(): "mismatch": item.get("mismatchCount") if item.get("mismatchCount") is not None else item.get("markerMismatchCount"), "transportFailures": item.get("transportFailureCount"), "actionsTaken": item.get("actionsTaken"), + "gatewayFailures": gateway.get("newFailures"), + "gatewayActions": gateway.get("actionsTaken"), "reasserts": len(item.get("reconcile") or []), }) quarantined = [item for item in account_rows if item.get("quarantineActive") is True]