fix: freeze codex accounts on gateway stream failures

This commit is contained in:
Codex
2026-06-11 17:06:37 +00:00
parent 170b538fe6
commit 35c18784a2
3 changed files with 340 additions and 7 deletions
@@ -179,6 +179,14 @@ sentinel:
maxOutputTokens: 16
transportRetryMinutes: 5
userAgent: Go-http-client/1.1
gatewayFailureMonitor:
enabled: true
lookbackSeconds: 900
tailLines: 4000
freezeTtlMinutes: 10
paths:
- /responses
- /v1/responses
cadence:
successInitialIntervalMinutes: 1
successMaxIntervalMinutes: 20
@@ -26,6 +26,13 @@ export interface CodexPoolSentinelConfig {
transportRetryMinutes: number;
userAgent: string;
};
gatewayFailureMonitor: {
enabled: boolean;
lookbackSeconds: number;
tailLines: number;
freezeTtlMinutes: number;
paths: string[];
};
sdk: {
openaiPythonVersion: string;
};
@@ -100,6 +107,13 @@ export function defaultCodexPoolSentinelConfig(): CodexPoolSentinelConfig {
transportRetryMinutes: 5,
userAgent: "Go-http-client/1.1",
},
gatewayFailureMonitor: {
enabled: false,
lookbackSeconds: 900,
tailLines: 4000,
freezeTtlMinutes: 10,
paths: ["/responses", "/v1/responses"],
},
sdk: {
openaiPythonVersion: "2.41.1",
},
@@ -146,6 +160,7 @@ export function readCodexPoolSentinelConfig(value: unknown, defaults: CodexPoolS
const actions = isRecord(value.actions) ? value.actions : {};
const marker = isRecord(value.marker) ? value.marker : {};
const probe = isRecord(value.probe) ? value.probe : {};
const gatewayFailureMonitor = isRecord(value.gatewayFailureMonitor) ? value.gatewayFailureMonitor : {};
const sdk = isRecord(value.sdk) ? value.sdk : {};
const cadence = isRecord(value.cadence) ? value.cadence : {};
const freeze = isRecord(value.freeze) ? value.freeze : {};
@@ -179,6 +194,13 @@ export function readCodexPoolSentinelConfig(value: unknown, defaults: CodexPoolS
transportRetryMinutes: readInt(valueAt(probe, "transportRetryMinutes"), `${sourcePath}.sentinel.probe.transportRetryMinutes`, defaults.probe.transportRetryMinutes, 1, 120),
userAgent: readUserAgent(valueAt(probe, "userAgent"), `${sourcePath}.sentinel.probe.userAgent`, defaults.probe.userAgent),
},
gatewayFailureMonitor: {
enabled: readBoolean(valueAt(gatewayFailureMonitor, "enabled"), `${sourcePath}.sentinel.gatewayFailureMonitor.enabled`, defaults.gatewayFailureMonitor.enabled),
lookbackSeconds: readInt(valueAt(gatewayFailureMonitor, "lookbackSeconds"), `${sourcePath}.sentinel.gatewayFailureMonitor.lookbackSeconds`, defaults.gatewayFailureMonitor.lookbackSeconds, 60, 7200),
tailLines: readInt(valueAt(gatewayFailureMonitor, "tailLines"), `${sourcePath}.sentinel.gatewayFailureMonitor.tailLines`, defaults.gatewayFailureMonitor.tailLines, 100, 50000),
freezeTtlMinutes: readInt(valueAt(gatewayFailureMonitor, "freezeTtlMinutes"), `${sourcePath}.sentinel.gatewayFailureMonitor.freezeTtlMinutes`, defaults.gatewayFailureMonitor.freezeTtlMinutes, 1, 1440),
paths: readPathList(valueAt(gatewayFailureMonitor, "paths"), `${sourcePath}.sentinel.gatewayFailureMonitor.paths`, defaults.gatewayFailureMonitor.paths),
},
sdk: {
openaiPythonVersion: readOpenAiPythonVersion(valueAt(sdk, "openaiPythonVersion"), `${sourcePath}.sentinel.sdk.openaiPythonVersion`, defaults.sdk.openaiPythonVersion),
},
@@ -239,6 +261,7 @@ export function codexPoolSentinelSummary(config: CodexPoolSentinelConfig): Recor
model: config.model,
endpoint: config.endpoint,
probe: config.probe,
gatewayFailureMonitor: config.gatewayFailureMonitor,
sdk: config.sdk,
cadence: config.cadence,
freeze: config.freeze,
@@ -271,6 +294,7 @@ export function renderCodexPoolSentinelManifest(
endpoint: config.endpoint,
marker: config.marker,
probe: config.probe,
gatewayFailureMonitor: config.gatewayFailureMonitor,
sdk: config.sdk,
cadence: config.cadence,
freeze: config.freeze,
@@ -336,6 +360,12 @@ rules:
- apiGroups: [""]
resources: ["configmaps"]
verbs: ["get", "create", "update", "patch"]
- apiGroups: [""]
resources: ["pods"]
verbs: ["get", "list"]
- apiGroups: [""]
resources: ["pods/log"]
verbs: ["get"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
@@ -527,7 +557,13 @@ class KubeClient:
try:
with request.urlopen(req, timeout=15, context=self.context) as resp:
raw = resp.read()
return resp.status, json.loads(raw.decode("utf-8")) if raw else None
if not raw:
return resp.status, None
text = raw.decode("utf-8", errors="replace")
try:
return resp.status, json.loads(text)
except Exception:
return resp.status, text
except error.HTTPError as exc:
raw = exc.read()
try:
@@ -572,6 +608,24 @@ class KubeClient:
raise RuntimeError(f"update state configmap {name} failed: {status} {data}")
return data
def list_pods(self, label_selector):
query = parse.urlencode({"labelSelector": label_selector})
status, data = self.api("GET", f"/api/v1/namespaces/{self.namespace}/pods?{query}")
if status >= 300:
raise RuntimeError(f"list pods failed: {status} {data}")
return data.get("items") if isinstance(data, dict) and isinstance(data.get("items"), list) else []
def pod_logs(self, pod_name, container, since_seconds, tail_lines):
query = parse.urlencode({
"container": container,
"sinceSeconds": int(since_seconds),
"tailLines": int(tail_lines),
})
status, data = self.api("GET", f"/api/v1/namespaces/{self.namespace}/pods/{url_quote(pod_name)}/log?{query}")
if status >= 300:
raise RuntimeError(f"get pod logs failed: {status} {data}")
return data if isinstance(data, str) else ""
def default_state():
return {
"version": 1,
@@ -1246,6 +1300,242 @@ def apply_result(result, state, config, now, admin, profile):
}
return action
def log_line_payload(line):
pos = line.find("{")
if pos < 0:
return None, "", None
prefix = line[:pos].rstrip("\t ")
parts = prefix.split("\t")
ts = parts[0] if parts else ""
message = parts[3] if len(parts) >= 4 else ""
try:
return ts, message, json.loads(line[pos:])
except Exception:
return ts, message, None
def gateway_monitor_paths(config):
cfg = config.get("gatewayFailureMonitor") if isinstance(config.get("gatewayFailureMonitor"), dict) else {}
paths = cfg.get("paths")
if isinstance(paths, list) and paths:
return set(str(item) for item in paths if isinstance(item, str) and item)
return {"/responses", "/v1/responses"}
def is_gateway_stream_failure(message, payload, config):
if "openai.forward_failed" not in message or not isinstance(payload, dict):
return False
path = payload.get("path")
if path not in gateway_monitor_paths(config):
return False
if payload.get("account_id") is None:
return False
error_text = str(payload.get("error") or "").lower()
fallback_written = payload.get("fallback_error_response_written") is True
upstream_already_written = payload.get("upstream_error_response_already_written") is True
stream_failure = any(token in error_text for token in (
"stream usage incomplete",
"missing terminal event",
"stream read error",
"stream data interval timeout",
))
return fallback_written or upstream_already_written or stream_failure
def gateway_failure_item(ts, pod_name, payload):
request_id = payload.get("request_id") or sha(json.dumps(payload, sort_keys=True, ensure_ascii=False))
try:
account_id = int(payload.get("account_id"))
except Exception:
account_id = None
return {
"at": ts,
"pod": pod_name,
"requestId": request_id,
"clientRequestId": payload.get("client_request_id"),
"accountId": account_id,
"path": payload.get("path"),
"errorPreview": preview(payload.get("error"), 240),
"fallbackErrorResponseWritten": payload.get("fallback_error_response_written") is True,
"upstreamErrorResponseAlreadyWritten": payload.get("upstream_error_response_already_written") is True,
"bodyBytes": payload.get("body_bytes"),
"latencyMs": payload.get("latency_ms"),
"statusCode": payload.get("status_code"),
"upstreamStatus": payload.get("upstream_status"),
}
def trim_gateway_seen(monitor_state, now, lookback_seconds):
seen = monitor_state.setdefault("seenRequestIds", {})
if not isinstance(seen, dict):
seen = {}
monitor_state["seenRequestIds"] = seen
cutoff = now - timedelta(seconds=max(int(lookback_seconds) * 4, 3600))
for request_id, seen_at in list(seen.items()):
parsed = parse_iso(seen_at)
if parsed is None or parsed < cutoff:
seen.pop(request_id, None)
return seen
def gateway_failure_account_map(admin):
by_id = {}
for name, account in admin.accounts().items():
try:
account_id = int(account.get("id"))
except Exception:
continue
by_id[account_id] = name
return by_id
def apply_gateway_failure(account_name, failures, state, config, now, admin, profile):
latest = failures[-1]
account_state = state.setdefault("accounts", {}).setdefault(account_name, {})
monitor_cfg = config.get("gatewayFailureMonitor") if isinstance(config.get("gatewayFailureMonitor"), dict) else {}
interval = int(monitor_cfg.get("freezeTtlMinutes") or config["freeze"]["maxTtlMinutes"])
until = add_minutes(now, interval, int(config["freeze"]["jitterPercent"]))
actions_enabled = bool(config["actions"]["enabled"])
applied = False
action = {"taken": False, "type": "would-freeze"}
if actions_enabled:
try:
action = {"taken": True, "type": "freeze", "result": admin.set_schedulable(account_name, False)}
applied = True
except Exception as exc:
action = {"taken": False, "type": "freeze-failed", "error": str(exc)}
account_state["quarantine"] = {
"active": True,
"applied": applied,
"until": iso(until),
"intervalMinutes": interval,
"reason": "gateway-forward-failure",
"failureKind": "gateway-stream-forward-failure",
"errorDetails": {
"kind": "Sub2APIGatewayForwardFailure",
"requestId": latest.get("requestId"),
"clientRequestId": latest.get("clientRequestId"),
"path": latest.get("path"),
"errorPreview": latest.get("errorPreview"),
"fallbackErrorResponseWritten": latest.get("fallbackErrorResponseWritten"),
"upstreamErrorResponseAlreadyWritten": latest.get("upstreamErrorResponseAlreadyWritten"),
"bodyBytes": latest.get("bodyBytes"),
"latencyMs": latest.get("latencyMs"),
"countInRun": len(failures),
},
"lastBadAt": iso(now),
}
account_state["nextProbeAfter"] = iso(until)
account_state["successStreak"] = 0
account_state["successIntervalMinutes"] = 0
account_state["successMaxIntervalMinutes"] = success_max_interval(profile, config)
account_state["lastStatus"] = "quarantined"
account_state["lastFailureAt"] = iso(now)
account_state["lastGatewayFailureAt"] = iso(now)
account_state["lastGatewayFailure"] = {
"accountName": account_name,
"accountId": latest.get("accountId"),
"requestId": latest.get("requestId"),
"clientRequestId": latest.get("clientRequestId"),
"path": latest.get("path"),
"errorPreview": latest.get("errorPreview"),
"fallbackErrorResponseWritten": latest.get("fallbackErrorResponseWritten"),
"upstreamErrorResponseAlreadyWritten": latest.get("upstreamErrorResponseAlreadyWritten"),
"bodyBytes": latest.get("bodyBytes"),
"latencyMs": latest.get("latencyMs"),
"countInRun": len(failures),
"firstAt": failures[0].get("at"),
"lastAt": latest.get("at"),
"freezeUntil": iso(until),
"action": action,
}
return action
def run_gateway_failure_monitor(state, config, now, kube, admin, profiles):
cfg = config.get("gatewayFailureMonitor") if isinstance(config.get("gatewayFailureMonitor"), dict) else {}
if cfg.get("enabled") is not True:
return {"enabled": False, "scanned": 0, "newFailures": 0, "actions": []}
lookback_seconds = int(cfg.get("lookbackSeconds") or 900)
tail_lines = int(cfg.get("tailLines") or 4000)
monitor_state = state.setdefault("gatewayFailureMonitor", {})
if not isinstance(monitor_state, dict):
monitor_state = {}
state["gatewayFailureMonitor"] = monitor_state
seen = trim_gateway_seen(monitor_state, now, lookback_seconds)
pods = kube.list_pods("app.kubernetes.io/name=sub2api")
candidates = []
log_errors = []
for pod in pods:
metadata = pod.get("metadata") if isinstance(pod, dict) else {}
status = pod.get("status") if isinstance(pod, dict) else {}
pod_name = metadata.get("name")
if not isinstance(pod_name, str) or not pod_name:
continue
if status.get("phase") not in (None, "Running"):
continue
try:
logs = kube.pod_logs(pod_name, "sub2api", lookback_seconds, tail_lines)
except Exception as exc:
log_errors.append({"pod": pod_name, "error": str(exc)})
continue
for line in str(logs).splitlines():
ts, message, payload = log_line_payload(line)
if is_gateway_stream_failure(message, payload, config):
candidates.append(gateway_failure_item(ts, pod_name, payload))
by_id = gateway_failure_account_map(admin) if candidates else {}
profile_by_name = {item.get("accountName"): item for item in profiles if isinstance(item, dict) and isinstance(item.get("accountName"), str)}
by_account = {}
skipped = []
new_failures = []
for item in candidates:
request_id = item.get("requestId")
if not isinstance(request_id, str) or not request_id:
request_id = sha(json.dumps(item, sort_keys=True, ensure_ascii=False))
item["requestId"] = request_id
if request_id in seen:
continue
seen[request_id] = iso(now)
account_name = by_id.get(item.get("accountId"))
if not account_name:
skipped.append({"requestId": request_id, "accountId": item.get("accountId"), "reason": "account-id-not-managed"})
continue
if account_name not in profile_by_name:
skipped.append({"requestId": request_id, "accountId": item.get("accountId"), "accountName": account_name, "reason": "account-not-in-profiles"})
continue
item["accountName"] = account_name
by_account.setdefault(account_name, []).append(item)
new_failures.append(item)
actions = []
for account_name, failures in sorted(by_account.items()):
failures.sort(key=lambda item: item.get("at") or "")
profile = profile_by_name[account_name]
action = apply_gateway_failure(account_name, failures, state, config, now, admin, profile)
actions.append({
"accountName": account_name,
"accountId": failures[-1].get("accountId"),
"failureCount": len(failures),
"requestId": failures[-1].get("requestId"),
"path": failures[-1].get("path"),
"errorPreview": failures[-1].get("errorPreview"),
"taken": action.get("taken"),
"type": action.get("type"),
"error": action.get("error"),
})
monitor_state["lastRunAt"] = iso(now)
monitor_state["lastScannedPods"] = [((pod.get("metadata") or {}).get("name")) for pod in pods if isinstance(pod, dict)]
monitor_state["lastCandidateCount"] = len(candidates)
monitor_state["lastNewFailureCount"] = len(new_failures)
monitor_state["lastActionCount"] = len(actions)
monitor_state["lastFailures"] = new_failures[-20:]
monitor_state["lastSkipped"] = skipped[-20:]
monitor_state["lastLogErrors"] = log_errors[-10:]
return {
"enabled": True,
"lookbackSeconds": lookback_seconds,
"tailLines": tail_lines,
"scannedPods": len(pods),
"candidates": len(candidates),
"newFailures": len(new_failures),
"actionsTaken": sum(1 for item in actions if item.get("taken") is True),
"actions": actions[-20:],
"skipped": skipped[-20:],
"logErrors": log_errors[-10:],
}
def reconcile_active_quarantines(state, config, now, admin):
actions = []
if not config["actions"]["enabled"]:
@@ -1283,6 +1573,7 @@ def main():
admin = Sub2ApiAdmin(config)
reconcile = reconcile_active_quarantines(state, config, now, admin)
forced_names = forced_account_names()
gateway_monitor = {"enabled": False, "skipped": "forced-manual-probe"} if forced_names else run_gateway_failure_monitor(state, config, now, kube, admin, profiles)
if forced_names:
due, selection = choose_forced_profiles(profiles, state, config, now, forced_names)
else:
@@ -1309,6 +1600,13 @@ def main():
"markerMismatchCount": sum(1 for item in results if item.get("markerMatched") is not True),
"transportFailureCount": sum(1 for item in results if item.get("transportFailure") is True),
"actionsTaken": sum(1 for item in actions if item.get("taken") is True),
"gatewayFailureMonitor": {
"enabled": gateway_monitor.get("enabled") is True,
"newFailures": gateway_monitor.get("newFailures", 0),
"actionsTaken": gateway_monitor.get("actionsTaken", 0),
"skipped": gateway_monitor.get("skipped"),
"logErrors": gateway_monitor.get("logErrors"),
},
"selection": selection,
"reconcile": reconcile[-20:],
}
@@ -1339,6 +1637,7 @@ def main():
"requestShape": item.get("requestShape"),
} for item in results],
"actions": actions,
"gatewayFailureMonitor": gateway_monitor,
"valuesPrinted": False,
}, ensure_ascii=False))
@@ -1405,6 +1704,18 @@ function readUserAgent(value: unknown, key: string, fallback: string): string {
return text;
}
function readPathList(value: unknown, key: string, fallback: string[]): string[] {
if (value === undefined || value === null) return fallback;
if (!Array.isArray(value) || value.length === 0) throw new Error(`${key} must be a non-empty string array`);
const paths = value.map((item, index) => {
if (typeof item !== "string" || item.trim().length === 0) throw new Error(`${key}[${index}] must be a non-empty string`);
const path = item.trim();
if (!/^\/[A-Za-z0-9._~!$&'()*+,;=:@/-]*$/u.test(path)) throw new Error(`${key}[${index}] has an unsupported HTTP path`);
return path;
});
return [...new Set(paths)];
}
function readOpenAiPythonVersion(value: unknown, key: string, fallback: string): string {
const text = readString(value, key, fallback);
if (!/^[0-9]+[.][0-9]+[.][0-9]+$/u.test(text)) throw new Error(`${key} must be a pinned semver version like 2.41.1`);
+20 -6
View File
@@ -1869,6 +1869,7 @@ function compactSentinelProbeResult(parsed: Record<string, unknown> | null): Rec
markerMismatchCount: summary.markerMismatchCount,
transportFailureCount: summary.transportFailureCount,
actionsTaken: summary.actionsTaken,
gatewayFailureMonitor: summary.gatewayFailureMonitor,
selection: summary.selection,
},
results: recordArray(probe.results).map((item) => pickSummaryFields(item, [
@@ -1955,7 +1956,7 @@ function renderSentinelReport(
textValue(account.successIntervalMin),
textValue(account.successMaxIntervalMin),
textValue(account.probeCount),
shortIso(account.lastProbeAt),
shortIso(account.lastEventAt ?? account.lastProbeAt),
textValue(account.lastHttp),
account.lastMarker === true ? "Y" : account.lastMarker === false ? "N" : "-",
shorten(stringValue(account.lastFailureKind) ?? "-", 20),
@@ -1968,7 +1969,7 @@ function renderSentinelReport(
lines.push("");
lines.push(`RUNS last=${Math.min(context.events, runs.length)}`);
lines.push(renderTable([
["AT", "SEL", "DUE", "OK", "BAD", "TF", "ACT", "REASSERT"],
["AT", "SEL", "DUE", "OK", "BAD", "TF", "ACT", "GF", "GACT", "REASSERT"],
...runs.slice(-context.events).map((run) => [
shortIso(run.at),
textValue(run.selected),
@@ -1977,12 +1978,14 @@ function renderSentinelReport(
textValue(run.mismatch),
textValue(run.transportFailures),
textValue(run.actionsTaken),
textValue(run.gatewayFailures),
textValue(run.gatewayActions),
textValue(run.reasserts),
]),
]));
}
lines.push("");
lines.push("LEGEND Q=quarantined T=trusted upstream M=marker matched F_MIN=freeze interval S_MIN=success interval S_MAX=success max interval OBS_MIN=last probe to next probe minutes TF=transport failures");
lines.push("LEGEND Q=quarantined T=trusted upstream M=marker matched F_MIN=freeze interval S_MIN=success interval S_MAX=success max interval OBS_MIN=last event to next probe minutes TF=transport failures GF=gateway failures GACT=gateway freeze actions");
lines.push("Raw: bun scripts/cli.ts platform-infra sub2api codex-pool sentinel-report --raw");
return lines.join("\n");
}
@@ -2973,6 +2976,12 @@ def report():
continue
probe = account_state.get("lastProbe") if isinstance(account_state.get("lastProbe"), dict) else {}
quarantine = account_state.get("quarantine") if isinstance(account_state.get("quarantine"), dict) else {}
gateway_failure = account_state.get("lastGatewayFailure") if isinstance(account_state.get("lastGatewayFailure"), dict) else {}
last_event_at = account_state.get("lastGatewayFailureAt") or account_state.get("lastProbeAt")
last_failure_kind = quarantine.get("failureKind") if quarantine.get("active") is True and quarantine.get("failureKind") else probe.get("failureKind")
last_action = action_type(probe)
if gateway_failure and (not account_state.get("lastProbeAt") or str(account_state.get("lastGatewayFailureAt") or "") >= str(account_state.get("lastProbeAt") or "")):
last_action = action_type(gateway_failure)
ledger = account_ledger(account_state)
account_rows.append({
"account": name,
@@ -2991,14 +3000,16 @@ def report():
"totalTokens": ledger.get("totalTokens", 0),
"estimatedCostUsd": round(float(ledger.get("estimatedCostUsd", 0)), 6),
"lastProbeAt": account_state.get("lastProbeAt"),
"lastEventAt": last_event_at,
"lastGatewayFailureAt": account_state.get("lastGatewayFailureAt"),
"lastPurpose": probe.get("purpose"),
"lastHttp": probe.get("httpStatus"),
"lastMarker": probe.get("markerMatched"),
"lastFailureKind": probe.get("failureKind"),
"lastFailureKind": last_failure_kind,
"lastErrorCode": error_code(probe),
"lastAction": action_type(probe),
"lastAction": last_action,
"nextProbeAfter": account_state.get("nextProbeAfter"),
"observedLastToNextMin": minutes_between(account_state.get("lastProbeAt"), account_state.get("nextProbeAfter")),
"observedLastToNextMin": minutes_between(last_event_at, account_state.get("nextProbeAfter")),
"requestShape": probe.get("requestShape"),
})
run_rows = []
@@ -3006,6 +3017,7 @@ def report():
if not isinstance(item, dict):
continue
selection = item.get("selection") if isinstance(item.get("selection"), dict) else {}
gateway = item.get("gatewayFailureMonitor") if isinstance(item.get("gatewayFailureMonitor"), dict) else {}
run_rows.append({
"at": item.get("at"),
"selected": item.get("selected"),
@@ -3014,6 +3026,8 @@ def report():
"mismatch": item.get("mismatchCount") if item.get("mismatchCount") is not None else item.get("markerMismatchCount"),
"transportFailures": item.get("transportFailureCount"),
"actionsTaken": item.get("actionsTaken"),
"gatewayFailures": gateway.get("newFailures"),
"gatewayActions": gateway.get("actionsTaken"),
"reasserts": len(item.get("reconcile") or []),
})
quarantined = [item for item in account_rows if item.get("quarantineActive") is True]