Files
pikasTech-unidesk/scripts/src/platform-infra-sub2api-codex-sentinel.ts
T
2026-07-01 15:52:22 +00:00

2142 lines
92 KiB
TypeScript

import { Buffer } from "node:buffer";
export interface CodexPoolSentinelConfig {
monitor: {
enabled: boolean;
};
actions: {
enabled: boolean;
};
schedule: string;
image: string;
serviceAccountName: string;
configMapName: string;
credentialsSecretName: string;
stateConfigMapName: string;
cronJobName: string;
roleName: string;
roleBindingName: string;
model: string;
endpoint: "responses";
marker: {
prefix: string;
exact: boolean;
};
probe: {
timeoutSeconds: number;
maxOutputTokens: number;
transportRetryMinutes: number;
userAgent: string;
};
gatewayFailureMonitor: {
enabled: boolean;
lookbackSeconds: number;
tailLines: number;
initialTtlMinutes: number;
maxTtlMinutes: number;
backoffMultiplier: number;
paths: string[];
};
sdk: {
openaiPythonVersion: string;
};
cadence: {
successInitialIntervalMinutes: number;
successMaxIntervalMinutes: number;
trustedSuccessMaxIntervalMinutes: number;
untrustedSuccessMaxIntervalMinutes: number;
successBackoffMultiplier: number;
jitterPercent: number;
};
freeze: {
initialTtlMinutes: number;
maxTtlMinutes: number;
backoffMultiplier: number;
jitterPercent: number;
};
pricing: {
usdPer1MInputTokens: number;
usdPer1MOutputTokens: number;
};
historyLimit: number;
protectedManualAccounts?: string[];
}
export interface CodexPoolSentinelImageTarget {
baseImage: string;
runtimeImage: string;
repository: string;
tag: string;
}
export interface CodexPoolSentinelProfileSecret {
accountName: string;
profile: string;
baseUrl: string;
apiKey: string;
upstreamUserAgent: string | null;
trustUpstream: boolean;
sentinelProtect: CodexPoolSentinelProtectPolicy;
}
export interface CodexPoolSentinelProtectPolicy {
enabled: boolean;
consecutiveFailures: number;
initialRetryDelaySeconds: number;
maxRetryDelaySeconds: number;
backoffMultiplier: number;
}
export interface CodexPoolSentinelManifestOptions {
namespace: string;
serviceName: string;
serviceDns: string;
appSecretName: string;
adminEmailDefault: string;
proxy?: {
httpProxy: string;
noProxy: string;
} | null;
}
export function codexPoolSentinelRuntimeImage(config: CodexPoolSentinelConfig): CodexPoolSentinelImageTarget {
const tag = sentinelRuntimeImageTag(config.image, config.sdk.openaiPythonVersion);
return {
baseImage: config.image,
runtimeImage: `127.0.0.1:5000/platform-infra/sub2api-account-sentinel:${tag}`,
repository: "127.0.0.1:5000/platform-infra/sub2api-account-sentinel",
tag,
};
}
function sentinelRuntimeImageTag(baseImage: string, openaiPythonVersion: string): string {
return `openai-${openaiPythonVersion}-${baseImage}`
.replace(/[^A-Za-z0-9_.-]+/gu, "-")
.replace(/^-+/u, "")
.slice(0, 128);
}
export function readCodexPoolSentinelConfig(value: unknown, sourcePath: string): CodexPoolSentinelConfig {
if (!isRecord(value)) throw new Error(`${sourcePath}.sentinel must be a YAML object`);
const monitor = readRequiredRecord(valueAt(value, "monitor"), `${sourcePath}.sentinel.monitor`);
const actions = readRequiredRecord(valueAt(value, "actions"), `${sourcePath}.sentinel.actions`);
const marker = readRequiredRecord(valueAt(value, "marker"), `${sourcePath}.sentinel.marker`);
const probe = readRequiredRecord(valueAt(value, "probe"), `${sourcePath}.sentinel.probe`);
const gatewayFailureMonitor = readRequiredRecord(valueAt(value, "gatewayFailureMonitor"), `${sourcePath}.sentinel.gatewayFailureMonitor`);
const sdk = readRequiredRecord(valueAt(value, "sdk"), `${sourcePath}.sentinel.sdk`);
const cadence = readRequiredRecord(valueAt(value, "cadence"), `${sourcePath}.sentinel.cadence`);
const freeze = readRequiredRecord(valueAt(value, "freeze"), `${sourcePath}.sentinel.freeze`);
const pricing = readRequiredRecord(valueAt(value, "pricing"), `${sourcePath}.sentinel.pricing`);
const config: CodexPoolSentinelConfig = {
monitor: {
enabled: readRequiredBoolean(valueAt(monitor, "enabled"), `${sourcePath}.sentinel.monitor.enabled`),
},
actions: {
enabled: readRequiredBoolean(valueAt(actions, "enabled"), `${sourcePath}.sentinel.actions.enabled`),
},
schedule: readRequiredString(valueAt(value, "schedule"), `${sourcePath}.sentinel.schedule`),
image: readRequiredImage(valueAt(value, "image"), `${sourcePath}.sentinel.image`),
serviceAccountName: readRequiredDnsName(valueAt(value, "serviceAccountName"), `${sourcePath}.sentinel.serviceAccountName`),
configMapName: readRequiredDnsName(valueAt(value, "configMapName"), `${sourcePath}.sentinel.configMapName`),
credentialsSecretName: readRequiredDnsName(valueAt(value, "credentialsSecretName"), `${sourcePath}.sentinel.credentialsSecretName`),
stateConfigMapName: readRequiredDnsName(valueAt(value, "stateConfigMapName"), `${sourcePath}.sentinel.stateConfigMapName`),
cronJobName: readRequiredDnsName(valueAt(value, "cronJobName"), `${sourcePath}.sentinel.cronJobName`),
roleName: readRequiredDnsName(valueAt(value, "roleName"), `${sourcePath}.sentinel.roleName`),
roleBindingName: readRequiredDnsName(valueAt(value, "roleBindingName"), `${sourcePath}.sentinel.roleBindingName`),
model: readRequiredModelName(valueAt(value, "model"), `${sourcePath}.sentinel.model`),
endpoint: readRequiredEndpoint(valueAt(value, "endpoint"), `${sourcePath}.sentinel.endpoint`),
marker: {
prefix: readRequiredMarkerPrefix(valueAt(marker, "prefix"), `${sourcePath}.sentinel.marker.prefix`),
exact: readRequiredBoolean(valueAt(marker, "exact"), `${sourcePath}.sentinel.marker.exact`),
},
probe: {
timeoutSeconds: readRequiredInt(valueAt(probe, "timeoutSeconds"), `${sourcePath}.sentinel.probe.timeoutSeconds`, 3, 300),
maxOutputTokens: readRequiredInt(valueAt(probe, "maxOutputTokens"), `${sourcePath}.sentinel.probe.maxOutputTokens`, 1, 128),
transportRetryMinutes: readRequiredInt(valueAt(probe, "transportRetryMinutes"), `${sourcePath}.sentinel.probe.transportRetryMinutes`, 1, 120),
userAgent: readRequiredUserAgent(valueAt(probe, "userAgent"), `${sourcePath}.sentinel.probe.userAgent`),
},
gatewayFailureMonitor: {
enabled: readRequiredBoolean(valueAt(gatewayFailureMonitor, "enabled"), `${sourcePath}.sentinel.gatewayFailureMonitor.enabled`),
lookbackSeconds: readRequiredInt(valueAt(gatewayFailureMonitor, "lookbackSeconds"), `${sourcePath}.sentinel.gatewayFailureMonitor.lookbackSeconds`, 60, 7200),
tailLines: readRequiredInt(valueAt(gatewayFailureMonitor, "tailLines"), `${sourcePath}.sentinel.gatewayFailureMonitor.tailLines`, 100, 50000),
initialTtlMinutes: readRequiredInt(valueAt(gatewayFailureMonitor, "initialTtlMinutes"), `${sourcePath}.sentinel.gatewayFailureMonitor.initialTtlMinutes`, 1, 1440),
maxTtlMinutes: readRequiredInt(valueAt(gatewayFailureMonitor, "maxTtlMinutes"), `${sourcePath}.sentinel.gatewayFailureMonitor.maxTtlMinutes`, 1, 1440),
backoffMultiplier: readRequiredInt(valueAt(gatewayFailureMonitor, "backoffMultiplier"), `${sourcePath}.sentinel.gatewayFailureMonitor.backoffMultiplier`, 1, 10),
paths: readRequiredPathList(valueAt(gatewayFailureMonitor, "paths"), `${sourcePath}.sentinel.gatewayFailureMonitor.paths`),
},
sdk: {
openaiPythonVersion: readRequiredOpenAiPythonVersion(valueAt(sdk, "openaiPythonVersion"), `${sourcePath}.sentinel.sdk.openaiPythonVersion`),
},
cadence: {
successInitialIntervalMinutes: readRequiredInt(valueAt(cadence, "successInitialIntervalMinutes"), `${sourcePath}.sentinel.cadence.successInitialIntervalMinutes`, 1, 1440),
successMaxIntervalMinutes: readRequiredInt(valueAt(cadence, "successMaxIntervalMinutes"), `${sourcePath}.sentinel.cadence.successMaxIntervalMinutes`, 1, 1440),
trustedSuccessMaxIntervalMinutes: readRequiredInt(valueAt(cadence, "trustedSuccessMaxIntervalMinutes"), `${sourcePath}.sentinel.cadence.trustedSuccessMaxIntervalMinutes`, 1, 1440),
untrustedSuccessMaxIntervalMinutes: readRequiredInt(valueAt(cadence, "untrustedSuccessMaxIntervalMinutes"), `${sourcePath}.sentinel.cadence.untrustedSuccessMaxIntervalMinutes`, 1, 1440),
successBackoffMultiplier: readRequiredInt(valueAt(cadence, "successBackoffMultiplier"), `${sourcePath}.sentinel.cadence.successBackoffMultiplier`, 1, 10),
jitterPercent: readRequiredInt(valueAt(cadence, "jitterPercent"), `${sourcePath}.sentinel.cadence.jitterPercent`, 0, 50),
},
freeze: {
initialTtlMinutes: readRequiredInt(valueAt(freeze, "initialTtlMinutes"), `${sourcePath}.sentinel.freeze.initialTtlMinutes`, 1, 1440),
maxTtlMinutes: readRequiredInt(valueAt(freeze, "maxTtlMinutes"), `${sourcePath}.sentinel.freeze.maxTtlMinutes`, 1, 1440),
backoffMultiplier: readRequiredInt(valueAt(freeze, "backoffMultiplier"), `${sourcePath}.sentinel.freeze.backoffMultiplier`, 1, 10),
jitterPercent: readRequiredInt(valueAt(freeze, "jitterPercent"), `${sourcePath}.sentinel.freeze.jitterPercent`, 0, 50),
},
pricing: {
usdPer1MInputTokens: readRequiredNumber(valueAt(pricing, "usdPer1MInputTokens"), `${sourcePath}.sentinel.pricing.usdPer1MInputTokens`, 0, 100000),
usdPer1MOutputTokens: readRequiredNumber(valueAt(pricing, "usdPer1MOutputTokens"), `${sourcePath}.sentinel.pricing.usdPer1MOutputTokens`, 0, 100000),
},
historyLimit: readRequiredInt(valueAt(value, "historyLimit"), `${sourcePath}.sentinel.historyLimit`, 1, 2000),
};
if (config.actions.enabled && !config.monitor.enabled) {
throw new Error(`${sourcePath}.sentinel.actions.enabled requires sentinel.monitor.enabled=true`);
}
if (config.cadence.successMaxIntervalMinutes < config.cadence.successInitialIntervalMinutes) {
throw new Error(`${sourcePath}.sentinel.cadence.successMaxIntervalMinutes must be >= successInitialIntervalMinutes`);
}
if (config.cadence.trustedSuccessMaxIntervalMinutes < config.cadence.successInitialIntervalMinutes) {
throw new Error(`${sourcePath}.sentinel.cadence.trustedSuccessMaxIntervalMinutes must be >= successInitialIntervalMinutes`);
}
if (config.cadence.untrustedSuccessMaxIntervalMinutes < config.cadence.successInitialIntervalMinutes) {
throw new Error(`${sourcePath}.sentinel.cadence.untrustedSuccessMaxIntervalMinutes must be >= successInitialIntervalMinutes`);
}
if (config.freeze.maxTtlMinutes < config.freeze.initialTtlMinutes) {
throw new Error(`${sourcePath}.sentinel.freeze.maxTtlMinutes must be >= initialTtlMinutes`);
}
if (config.gatewayFailureMonitor.maxTtlMinutes < config.gatewayFailureMonitor.initialTtlMinutes) {
throw new Error(`${sourcePath}.sentinel.gatewayFailureMonitor.maxTtlMinutes must be >= initialTtlMinutes`);
}
if (!/^[-0-9A-Za-z_/*,\s]+$/u.test(config.schedule)) {
throw new Error(`${sourcePath}.sentinel.schedule has an unsupported cron format`);
}
if (!/^[A-Za-z0-9._:/@-]+$/u.test(config.image)) {
throw new Error(`${sourcePath}.sentinel.image has an unsupported image format`);
}
return config;
}
export function codexPoolSentinelSummary(config: CodexPoolSentinelConfig): Record<string, unknown> {
return {
monitorEnabled: config.monitor.enabled,
actionsEnabled: config.actions.enabled,
schedule: config.schedule,
cronJobName: config.cronJobName,
roleName: config.roleName,
roleBindingName: config.roleBindingName,
configMapName: config.configMapName,
credentialsSecretName: config.credentialsSecretName,
stateConfigMapName: config.stateConfigMapName,
image: config.image,
model: config.model,
endpoint: config.endpoint,
probe: config.probe,
gatewayFailureMonitor: config.gatewayFailureMonitor,
sdk: config.sdk,
cadence: config.cadence,
freeze: config.freeze,
protectedManualAccountCount: config.protectedManualAccounts?.length ?? 0,
accounting: {
mode: "record-only",
pricing: config.pricing,
},
marker: {
prefix: config.marker.prefix,
exact: config.marker.exact,
},
valuesPrinted: false,
};
}
export function renderCodexPoolSentinelManifest(
config: CodexPoolSentinelConfig,
profiles: CodexPoolSentinelProfileSecret[],
options: CodexPoolSentinelManifestOptions,
): string {
const profilesJson = JSON.stringify({ profiles }, null, 2);
const runnerConfig = {
monitor: config.monitor,
actions: config.actions,
service: {
baseUrl: `http://${options.serviceDns}`,
adminEmailDefault: options.adminEmailDefault,
},
model: config.model,
endpoint: config.endpoint,
marker: config.marker,
probe: config.probe,
gatewayFailureMonitor: config.gatewayFailureMonitor,
sdk: config.sdk,
cadence: config.cadence,
freeze: config.freeze,
pricing: config.pricing,
protectedManualAccounts: config.protectedManualAccounts ?? [],
state: {
configMapName: config.stateConfigMapName,
historyLimit: config.historyLimit,
},
};
const suspend = config.monitor.enabled ? "false" : "true";
const activeDeadlineSeconds = Math.max(300, Math.min(3600, config.probe.timeoutSeconds + 240));
const command = sentinelContainerShellCommand(config);
const runtimeImage = codexPoolSentinelRuntimeImage(config).runtimeImage;
const proxyEnv = options.proxy?.httpProxy
? ` - name: HTTP_PROXY
value: ${JSON.stringify(options.proxy.httpProxy)}
- name: HTTPS_PROXY
value: ${JSON.stringify(options.proxy.httpProxy)}
- name: ALL_PROXY
value: ${JSON.stringify(options.proxy.httpProxy)}
- name: http_proxy
value: ${JSON.stringify(options.proxy.httpProxy)}
- name: https_proxy
value: ${JSON.stringify(options.proxy.httpProxy)}
- name: all_proxy
value: ${JSON.stringify(options.proxy.httpProxy)}
- name: NO_PROXY
value: ${JSON.stringify(options.proxy.noProxy)}
- name: no_proxy
value: ${JSON.stringify(options.proxy.noProxy)}
`
: "";
return `apiVersion: v1
kind: Secret
metadata:
name: ${config.credentialsSecretName}
namespace: ${options.namespace}
labels:
app.kubernetes.io/name: ${config.cronJobName}
app.kubernetes.io/part-of: platform-infra
app.kubernetes.io/managed-by: unidesk
unidesk.ai/secret-purpose: sub2api-account-sentinel-profiles
type: Opaque
data:
profiles.json: ${Buffer.from(profilesJson, "utf8").toString("base64")}
---
apiVersion: v1
kind: ConfigMap
metadata:
name: ${config.configMapName}
namespace: ${options.namespace}
labels:
app.kubernetes.io/name: ${config.cronJobName}
app.kubernetes.io/part-of: platform-infra
app.kubernetes.io/managed-by: unidesk
data:
config.json: |
${indentBlock(JSON.stringify(runnerConfig, null, 2), 4)}
sentinel.py: |
${indentBlock(sentinelRunnerPython(), 4)}
---
apiVersion: v1
kind: ServiceAccount
metadata:
name: ${config.serviceAccountName}
namespace: ${options.namespace}
labels:
app.kubernetes.io/name: ${config.cronJobName}
app.kubernetes.io/part-of: platform-infra
app.kubernetes.io/managed-by: unidesk
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: ${config.roleName}
namespace: ${options.namespace}
labels:
app.kubernetes.io/name: ${config.cronJobName}
app.kubernetes.io/part-of: platform-infra
app.kubernetes.io/managed-by: unidesk
rules:
- apiGroups: [""]
resources: ["configmaps"]
verbs: ["get", "create", "update", "patch"]
- apiGroups: [""]
resources: ["pods"]
verbs: ["get", "list"]
- apiGroups: [""]
resources: ["pods/log"]
verbs: ["get"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: ${config.roleBindingName}
namespace: ${options.namespace}
labels:
app.kubernetes.io/name: ${config.cronJobName}
app.kubernetes.io/part-of: platform-infra
app.kubernetes.io/managed-by: unidesk
subjects:
- kind: ServiceAccount
name: ${config.serviceAccountName}
namespace: ${options.namespace}
roleRef:
kind: Role
name: ${config.roleName}
apiGroup: rbac.authorization.k8s.io
---
apiVersion: batch/v1
kind: CronJob
metadata:
name: ${config.cronJobName}
namespace: ${options.namespace}
labels:
app.kubernetes.io/name: ${config.cronJobName}
app.kubernetes.io/part-of: platform-infra
app.kubernetes.io/managed-by: unidesk
spec:
schedule: "${config.schedule}"
concurrencyPolicy: Forbid
suspend: ${suspend}
successfulJobsHistoryLimit: 3
failedJobsHistoryLimit: 5
jobTemplate:
spec:
ttlSecondsAfterFinished: 3600
activeDeadlineSeconds: ${activeDeadlineSeconds}
template:
metadata:
labels:
app.kubernetes.io/name: ${config.cronJobName}
app.kubernetes.io/part-of: platform-infra
app.kubernetes.io/managed-by: unidesk
spec:
serviceAccountName: ${config.serviceAccountName}
restartPolicy: Never
containers:
- name: sentinel
image: ${runtimeImage}
imagePullPolicy: IfNotPresent
command: ["sh", "-c"]
args:
- ${JSON.stringify(command)}
env:
- name: ADMIN_EMAIL
valueFrom:
configMapKeyRef:
name: sub2api-config
key: ADMIN_EMAIL
optional: true
- name: ADMIN_PASSWORD
valueFrom:
secretKeyRef:
name: ${options.appSecretName}
key: ADMIN_PASSWORD
optional: true
- name: POD_NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
${proxyEnv}
volumeMounts:
- name: sentinel-config
mountPath: /opt/sentinel
readOnly: true
- name: sentinel-profiles
mountPath: /opt/sentinel-secrets
readOnly: true
volumes:
- name: sentinel-config
configMap:
name: ${config.configMapName}
defaultMode: 0555
- name: sentinel-profiles
secret:
secretName: ${config.credentialsSecretName}
`;
}
export function sentinelContainerShellCommand(config: CodexPoolSentinelConfig): string {
return [
"set -eu",
`export OPENAI_PYTHON_VERSION=${JSON.stringify(config.sdk.openaiPythonVersion)}`,
"if ! python3 - <<'PY'",
"import importlib.metadata",
"import os",
"expected = os.environ['OPENAI_PYTHON_VERSION']",
"try:",
" current = importlib.metadata.version('openai')",
"except importlib.metadata.PackageNotFoundError:",
" current = None",
"if current != expected:",
" raise SystemExit(1)",
"PY",
"then",
" python3 -m pip install --no-cache-dir \"openai==$OPENAI_PYTHON_VERSION\"",
"fi",
"python3 - <<'PY'",
"import importlib.metadata",
"import os",
"expected = os.environ['OPENAI_PYTHON_VERSION']",
"current = importlib.metadata.version('openai')",
"if current != expected:",
" raise SystemExit(f'openai-python-version-mismatch expected={expected} current={current}')",
"PY",
"exec python3 /opt/sentinel/sentinel.py",
].join("\n");
}
export function sentinelRunnerPython(): string {
return String.raw`#!/usr/bin/env python3
import base64
import hashlib
import json
import math
import os
import random
import ssl
import time
import traceback
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime, timezone, timedelta
from urllib import error, parse, request
from openai import APIConnectionError, APIStatusError, APITimeoutError, OpenAI
CONFIG_PATH = "/opt/sentinel/config.json"
PROFILES_PATH = "/opt/sentinel-secrets/profiles.json"
STATE_KEY = "state.json"
def utc_now():
return datetime.now(timezone.utc)
def iso(dt):
return dt.astimezone(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z")
def parse_iso(value):
if not isinstance(value, str) or not value:
return None
try:
return datetime.fromisoformat(value.replace("Z", "+00:00"))
except Exception:
return None
def load_json(path):
with open(path, encoding="utf-8") as handle:
return json.load(handle)
def sha(value):
return hashlib.sha256(str(value).encode("utf-8", errors="replace")).hexdigest()[:16]
def preview(value, limit=160):
if not isinstance(value, str):
value = str(value)
value = " ".join(value.replace("\r", " ").replace("\n", " ").split())
return value[:limit]
def day_key(now):
return now.strftime("%Y-%m-%d")
def add_minutes(now, minutes, jitter_percent=0):
minutes = float(minutes)
if jitter_percent:
factor = 1 + random.uniform(-jitter_percent, jitter_percent) / 100
minutes = max(1, minutes * factor)
return now + timedelta(minutes=minutes)
def estimate_tokens(text):
if not isinstance(text, str) or not text:
return 0
return max(1, math.ceil(len(text) / 4))
class KubeClient:
def __init__(self, namespace):
self.namespace = namespace
with open("/var/run/secrets/kubernetes.io/serviceaccount/token", encoding="utf-8") as handle:
self.token = handle.read().strip()
self.base = "https://kubernetes.default.svc"
self.context = ssl.create_default_context(cafile="/var/run/secrets/kubernetes.io/serviceaccount/ca.crt")
def api(self, method, path, payload=None):
body = None if payload is None else json.dumps(payload).encode("utf-8")
req = request.Request(
self.base + path,
data=body,
method=method,
headers={
"Authorization": "Bearer " + self.token,
"Accept": "application/json",
"Content-Type": "application/json",
},
)
try:
with request.urlopen(req, timeout=15, context=self.context) as resp:
raw = resp.read()
if not raw:
return resp.status, None
text = raw.decode("utf-8", errors="replace")
try:
return resp.status, json.loads(text)
except Exception:
return resp.status, text
except error.HTTPError as exc:
raw = exc.read()
try:
parsed = json.loads(raw.decode("utf-8")) if raw else None
except Exception:
parsed = {"message": raw.decode("utf-8", errors="replace")}
return exc.code, parsed
def get_configmap(self, name):
status, data = self.api("GET", f"/api/v1/namespaces/{self.namespace}/configmaps/{name}")
if status == 404:
return None
if status >= 300:
raise RuntimeError(f"get configmap {name} failed: {status} {data}")
return data
def create_configmap(self, name, state):
payload = {
"apiVersion": "v1",
"kind": "ConfigMap",
"metadata": {
"name": name,
"namespace": self.namespace,
"labels": {
"app.kubernetes.io/part-of": "platform-infra",
"app.kubernetes.io/managed-by": "unidesk",
"unidesk.ai/state-purpose": "sub2api-account-sentinel",
},
},
"data": {STATE_KEY: json.dumps(state, ensure_ascii=False, indent=2)},
}
status, data = self.api("POST", f"/api/v1/namespaces/{self.namespace}/configmaps", payload)
if status >= 300 and status != 409:
raise RuntimeError(f"create state configmap {name} failed: {status} {data}")
return self.get_configmap(name)
def update_configmap_state(self, obj, state):
obj.setdefault("data", {})[STATE_KEY] = json.dumps(state, ensure_ascii=False, indent=2)
name = obj["metadata"]["name"]
status, data = self.api("PUT", f"/api/v1/namespaces/{self.namespace}/configmaps/{name}", obj)
if status >= 300:
raise RuntimeError(f"update state configmap {name} failed: {status} {data}")
return data
def list_pods(self, label_selector):
query = parse.urlencode({"labelSelector": label_selector})
status, data = self.api("GET", f"/api/v1/namespaces/{self.namespace}/pods?{query}")
if status >= 300:
raise RuntimeError(f"list pods failed: {status} {data}")
return data.get("items") if isinstance(data, dict) and isinstance(data.get("items"), list) else []
def pod_logs(self, pod_name, container, since_seconds, tail_lines):
query = parse.urlencode({
"container": container,
"sinceSeconds": int(since_seconds),
"tailLines": int(tail_lines),
})
status, data = self.api("GET", f"/api/v1/namespaces/{self.namespace}/pods/{url_quote(pod_name)}/log?{query}")
if status >= 300:
raise RuntimeError(f"get pod logs failed: {status} {data}")
return data if isinstance(data, str) else ""
def default_state():
return {
"version": 1,
"accounts": {},
"ledger": {},
"history": [],
}
def load_state(kube, config):
name = config["state"]["configMapName"]
obj = kube.get_configmap(name)
if obj is None:
obj = kube.create_configmap(name, default_state())
raw = (obj.get("data") or {}).get(STATE_KEY)
try:
state = json.loads(raw) if raw else default_state()
except Exception:
state = default_state()
state["stateLoadWarning"] = "invalid-state-json-reset"
state.setdefault("version", 1)
state.setdefault("accounts", {})
state.setdefault("ledger", {})
state.setdefault("history", [])
return obj, state
def http_json(method, url, headers=None, payload=None, timeout=30, max_bytes=65536):
body = None if payload is None else json.dumps(payload, separators=(",", ":")).encode("utf-8")
req = request.Request(url, data=body, method=method, headers=headers or {})
started = time.time()
try:
with request.urlopen(req, timeout=timeout) as resp:
raw = resp.read(max_bytes + 1)
too_large = len(raw) > max_bytes
if too_large:
raw = raw[:max_bytes]
text = raw.decode("utf-8", errors="replace")
parsed = None
try:
parsed = json.loads(text) if text.strip() else None
except Exception:
parsed = None
app_success = not (isinstance(parsed, dict) and parsed.get("code") not in (None, 0))
return {
"ok": 200 <= resp.status < 300 and not too_large and app_success,
"status": resp.status,
"json": parsed,
"text": text,
"tooLarge": too_large,
"durationMs": int((time.time() - started) * 1000),
}
except error.HTTPError as exc:
raw = exc.read(max_bytes + 1)
text = raw.decode("utf-8", errors="replace")
parsed = None
try:
parsed = json.loads(text) if text.strip() else None
except Exception:
parsed = None
return {
"ok": False,
"status": exc.code,
"json": parsed,
"text": text,
"tooLarge": len(raw) > max_bytes,
"durationMs": int((time.time() - started) * 1000),
"error": str(exc),
}
except Exception as exc:
return {
"ok": False,
"status": 0,
"json": None,
"text": "",
"tooLarge": False,
"durationMs": int((time.time() - started) * 1000),
"error": str(exc),
}
def find_token(value):
if isinstance(value, dict):
for key in ("access_token", "token"):
if isinstance(value.get(key), str) and value[key]:
return value[key]
for item in value.values():
found = find_token(item)
if found:
return found
if isinstance(value, list):
for item in value:
found = find_token(item)
if found:
return found
return None
def url_quote(value):
return parse.quote(str(value), safe="")
class Sub2ApiAdmin:
def __init__(self, config):
self.base = config["service"]["baseUrl"].rstrip("/")
self.email = os.environ.get("ADMIN_EMAIL") or config["service"]["adminEmailDefault"]
self.password = os.environ.get("ADMIN_PASSWORD") or ""
protected = config.get("protectedManualAccounts") if isinstance(config.get("protectedManualAccounts"), list) else []
self.protected_manual_accounts = set(str(item) for item in protected if isinstance(item, str) and item)
self.token = None
self.accounts_by_name = None
def login(self):
if self.token:
return self.token
if not self.password:
raise RuntimeError("ADMIN_PASSWORD is missing")
resp = http_json("POST", self.base + "/api/v1/auth/login", {"Content-Type": "application/json"}, {"email": self.email, "password": self.password}, timeout=15)
token = find_token(resp.get("json"))
if not resp["ok"] or not token:
raise RuntimeError("admin login failed")
self.token = token
return token
def request(self, method, path, payload=None):
token = self.login()
headers = {"Authorization": "Bearer " + token, "Content-Type": "application/json"}
resp = http_json(method, self.base + path, headers, payload, timeout=20, max_bytes=2 * 1024 * 1024)
if not resp["ok"]:
parsed = resp.get("json")
code = parsed.get("code") if isinstance(parsed, dict) else None
message = parsed.get("message") if isinstance(parsed, dict) else None
detail = f"status={resp.get('status')} tooLarge={resp.get('tooLarge')}"
if code is not None:
detail += f" code={code}"
if message:
detail += f" message={preview(message, 200)}"
raise RuntimeError(f"admin {method} {path} failed: {detail}")
parsed = resp.get("json")
if isinstance(parsed, dict) and parsed.get("code") not in (None, 0):
raise RuntimeError(f"admin {method} {path} failed: code={parsed.get('code')} message={parsed.get('message')}")
if isinstance(parsed, dict) and "data" in parsed:
return parsed["data"]
return parsed
def accounts(self):
if self.accounts_by_name is not None:
return self.accounts_by_name
data = self.request("GET", "/api/v1/admin/accounts?page=1&page_size=500&platform=openai&type=apikey&search=unidesk-codex-")
items = []
if isinstance(data, list):
items = data
elif isinstance(data, dict):
for key in ("items", "accounts"):
if isinstance(data.get(key), list):
items = data[key]
break
self.accounts_by_name = {item.get("name"): item for item in items if isinstance(item, dict) and isinstance(item.get("name"), str)}
return self.accounts_by_name
def account(self, account_name):
if self.accounts_by_name is not None and account_name in self.accounts_by_name:
return self.accounts_by_name[account_name]
data = self.request("GET", "/api/v1/admin/accounts?page=1&page_size=20&platform=openai&type=apikey&search=" + url_quote(account_name))
items = data if isinstance(data, list) else []
if isinstance(data, dict):
for key in ("items", "accounts"):
if isinstance(data.get(key), list):
items = data[key]
break
for item in items:
if isinstance(item, dict) and item.get("name") == account_name:
if self.accounts_by_name is not None:
self.accounts_by_name[account_name] = item
return item
return None
def set_schedulable(self, account_name, schedulable):
if account_name in self.protected_manual_accounts:
return {
"accountId": None,
"previousSchedulable": None,
"schedulable": None,
"skipped": True,
"reason": "protected-manual-account",
}
account = self.account(account_name)
if not account or account.get("id") is None:
raise RuntimeError(f"account {account_name} not found")
previous = account.get("schedulable")
self.request("POST", f"/api/v1/admin/accounts/{account['id']}/schedulable", {"schedulable": bool(schedulable)})
account["schedulable"] = bool(schedulable)
if self.accounts_by_name is not None:
self.accounts_by_name[account_name] = account
return {"accountId": account.get("id"), "previousSchedulable": previous, "schedulable": bool(schedulable)}
def upstream_base_url(base_url):
base = str(base_url).rstrip("/")
return base if base.endswith("/v1") else base + "/v1"
def output_text(parsed):
if isinstance(parsed, dict) and isinstance(parsed.get("output_text"), str):
return parsed["output_text"]
parts = []
output = parsed.get("output") if isinstance(parsed, dict) else None
if isinstance(output, list):
for item in output:
if not isinstance(item, dict):
continue
content = item.get("content")
if not isinstance(content, list):
continue
for block in content:
if isinstance(block, dict) and isinstance(block.get("text"), str):
parts.append(block["text"])
return "\n".join(parts)
def model_dump(value):
if hasattr(value, "model_dump"):
return value.model_dump()
if isinstance(value, dict):
return value
return {}
def body_text(value):
if isinstance(value, bytes):
return value.decode("utf-8", errors="replace")
if isinstance(value, str):
return value
try:
return json.dumps(value, ensure_ascii=False)
except Exception:
return str(value)
def redact_diagnostic(value):
if isinstance(value, dict):
redacted = {}
for key, item in value.items():
key_text = str(key)
if any(token in key_text.lower() for token in ("key", "token", "secret", "password", "credential", "authorization")):
redacted[key_text] = "[redacted]"
else:
redacted[key_text] = redact_diagnostic(item)
return redacted
if isinstance(value, list):
return [redact_diagnostic(item) for item in value[:20]]
if isinstance(value, str):
return value if len(value) <= 2000 else value[:2000] + "...[truncated]"
if isinstance(value, (int, float, bool)) or value is None:
return value
return str(value)
def selected_headers(headers):
if headers is None:
return {}
selected = {}
for key in (
"content-type",
"x-request-id",
"x-ratelimit-limit-requests",
"x-ratelimit-remaining-requests",
"x-ratelimit-reset-requests",
"cf-ray",
"server",
):
try:
value = headers.get(key)
except Exception:
value = None
if value:
selected[key] = str(value)
return selected
def openai_error_fields(body):
if not isinstance(body, dict):
return {}
error_obj = body.get("error")
if isinstance(error_obj, dict):
return {
"message": error_obj.get("message"),
"type": error_obj.get("type"),
"param": error_obj.get("param"),
"code": error_obj.get("code"),
}
return {
"message": body.get("message"),
"type": body.get("type"),
"param": body.get("param"),
"code": body.get("code"),
}
def error_details(kind, status, body=None, message=None, headers=None):
text = body_text(body)
return {
"kind": kind,
"statusCode": status,
"message": str(message) if message else None,
"openaiError": openai_error_fields(body),
"body": redact_diagnostic(body) if isinstance(body, (dict, list)) else None,
"bodyHash": sha(text),
"bodyPreview": preview(text, 1000),
"headers": selected_headers(headers),
}
def sub2api_style_input(prompt):
return [{
"role": "user",
"content": [{
"type": "input_text",
"text": prompt,
}],
}]
def sub2api_style_instructions():
return (
"You are Codex, based on GPT-5. You are running as a coding agent in the Codex CLI on a user's computer."
)
def event_type(event):
if isinstance(event, dict):
return event.get("type")
return getattr(event, "type", None)
def event_delta(event):
if isinstance(event, dict):
value = event.get("delta")
return value if isinstance(value, str) else ""
value = getattr(event, "delta", "")
return value if isinstance(value, str) else ""
def event_error_message(event):
data = model_dump(event)
if isinstance(data, dict):
if isinstance(data.get("error"), dict):
message = data["error"].get("message")
if isinstance(message, str) and message:
return message
if isinstance(data.get("response"), dict) and isinstance(data["response"].get("error"), dict):
message = data["response"]["error"].get("message")
if isinstance(message, str) and message:
return message
return None
def openai_responses_create(profile, config, marker, prompt):
headers = {
"User-Agent": profile.get("upstreamUserAgent") or config["probe"].get("userAgent") or "Go-http-client/1.1",
"X-Request-ID": "unidesk-account-sentinel-" + hashlib.sha256(marker.encode()).hexdigest()[:16],
}
client = OpenAI(
api_key=profile["apiKey"],
base_url=upstream_base_url(profile["baseUrl"]),
timeout=float(config["probe"]["timeoutSeconds"]),
max_retries=0,
)
started = time.time()
try:
stream = client.responses.create(
model=config["model"],
input=sub2api_style_input(prompt),
instructions=sub2api_style_instructions(),
stream=True,
extra_headers=headers,
)
deltas = []
events = []
seen_completed = False
max_chars = max(32, int(config["probe"]["maxOutputTokens"]) * 12)
for event in stream:
event_data = model_dump(event)
etype = event_type(event)
events.append({"type": etype, "preview": preview(body_text(event_data), 240)})
if etype == "response.output_text.delta":
delta = event_delta(event)
if delta:
deltas.append(delta)
if len("".join(deltas)) > max_chars:
break
elif etype in ("response.completed", "response.done"):
seen_completed = True
break
elif etype in ("response.failed", "error"):
message = event_error_message(event) or "OpenAI response failed"
raise RuntimeError(message)
out = "".join(deltas)
parsed = {"stream": True, "completed": seen_completed, "events": events[-20:], "output_text": out}
if not seen_completed:
parsed["streamError"] = "stream ended before response.completed"
return {
"ok": seen_completed,
"status": 200 if seen_completed else 0,
"json": parsed,
"outputText": out,
"text": body_text(parsed),
"tooLarge": not seen_completed and len(out) > max_chars,
"durationMs": int((time.time() - started) * 1000),
"sdk": "openai-python",
"requestShape": "sub2api-account-test-streaming-responses",
}
except APIStatusError as exc:
status = getattr(exc, "status_code", 0) or 0
body = getattr(exc, "body", None)
response = getattr(exc, "response", None)
return {
"ok": False,
"status": status,
"json": body if isinstance(body, dict) else None,
"text": body_text(body or response or ""),
"tooLarge": False,
"durationMs": int((time.time() - started) * 1000),
"error": str(exc),
"errorDetails": error_details("APIStatusError", status, body, str(exc), getattr(response, "headers", None)),
"sdk": "openai-python",
"requestShape": "sub2api-account-test-streaming-responses",
}
except (APITimeoutError, APIConnectionError) as exc:
return {
"ok": False,
"status": 0,
"json": None,
"text": "",
"tooLarge": False,
"durationMs": int((time.time() - started) * 1000),
"error": str(exc),
"errorDetails": error_details(exc.__class__.__name__, 0, None, str(exc), None),
"sdk": "openai-python",
"requestShape": "sub2api-account-test-streaming-responses",
}
except Exception as exc:
return {
"ok": False,
"status": 0,
"json": None,
"text": "",
"tooLarge": False,
"durationMs": int((time.time() - started) * 1000),
"error": str(exc),
"errorDetails": error_details(exc.__class__.__name__, 0, None, str(exc), None),
"sdk": "openai-python",
"requestShape": "sub2api-account-test-streaming-responses",
}
def usage_from(parsed, prompt, out, config):
usage = parsed.get("usage") if isinstance(parsed, dict) and isinstance(parsed.get("usage"), dict) else {}
input_tokens = usage.get("input_tokens")
output_tokens = usage.get("output_tokens")
estimated = False
if not isinstance(input_tokens, int):
input_tokens = estimate_tokens(prompt)
estimated = True
if not isinstance(output_tokens, int):
output_tokens = estimate_tokens(out)
estimated = True
total = usage.get("total_tokens")
if not isinstance(total, int):
total = input_tokens + output_tokens
estimated = True
cost = (
input_tokens * float(config["pricing"]["usdPer1MInputTokens"])
+ output_tokens * float(config["pricing"]["usdPer1MOutputTokens"])
) / 1000000
return {
"inputTokens": input_tokens,
"outputTokens": output_tokens,
"totalTokens": total,
"estimated": estimated,
"estimatedCostUsd": cost,
}
def probe_account(profile, config, purpose):
marker = config["marker"]["prefix"] + "_" + hashlib.sha256((profile["accountName"] + str(time.time()) + str(random.random())).encode()).hexdigest()[:10]
prompt = "Return exactly this marker and no other text: " + marker
resp = openai_responses_create(profile, config, marker, prompt)
parsed = resp.get("json")
out = resp.get("outputText") if isinstance(resp.get("outputText"), str) else output_text(parsed)
trimmed = out.strip()
marker_matched = trimmed == marker if config["marker"].get("exact", True) else marker in trimmed
usage = usage_from(parsed if isinstance(parsed, dict) else {}, prompt, out or resp.get("text", ""), config)
http_success = isinstance(resp.get("status"), int) and 200 <= resp.get("status") < 300
ok = marker_matched
mismatch = not marker_matched
if marker_matched:
failure_kind = "none"
elif resp.get("tooLarge"):
failure_kind = "response-too-large"
elif not resp["ok"]:
failure_kind = "transport-or-http-failure"
elif http_success:
failure_kind = "success-body-mismatch"
else:
failure_kind = "unknown-marker-mismatch"
return {
"accountName": profile["accountName"],
"profile": profile.get("profile"),
"trustUpstream": profile.get("trustUpstream") is True,
"purpose": purpose,
"ok": ok,
"markerMatched": marker_matched,
"markerHash": sha(marker),
"httpStatus": resp.get("status"),
"transportOk": resp["ok"],
"tooLarge": resp.get("tooLarge"),
"durationMs": resp.get("durationMs"),
"outputHash": sha(out),
"outputPreview": "" if marker_matched else preview(out or resp.get("text", ""), 160),
"responseBodyHash": sha(resp.get("text", "")),
"responseBodyPreview": "" if marker_matched else preview(resp.get("text", ""), 1000),
"error": resp.get("error"),
"errorDetails": resp.get("errorDetails"),
"usage": usage,
"mismatch": mismatch,
"transportFailure": not resp["ok"],
"failureKind": failure_kind,
"sdk": resp.get("sdk"),
"requestShape": resp.get("requestShape"),
}
def protect_policy(profile):
policy = profile.get("sentinelProtect") if isinstance(profile.get("sentinelProtect"), dict) else {}
return policy if policy.get("enabled") is True else None
def protect_failure_threshold(policy):
try:
return max(1, int(policy.get("consecutiveFailures") or 1))
except Exception:
return 1
def protect_retry_delay_seconds(policy, retry_index):
try:
initial = max(1, int(policy.get("initialRetryDelaySeconds") or 2))
except Exception:
initial = 2
try:
maximum = max(initial, int(policy.get("maxRetryDelaySeconds") or initial))
except Exception:
maximum = initial
try:
multiplier = max(1, int(policy.get("backoffMultiplier") or 2))
except Exception:
multiplier = 2
if retry_index <= 0:
return 0
return min(maximum, initial * (multiplier ** (retry_index - 1)))
def probe_account_with_protection(profile, config, purpose):
policy = protect_policy(profile)
if policy is None:
return probe_account(profile, config, purpose)
threshold = protect_failure_threshold(policy)
attempts = []
last_result = None
for index in range(threshold):
delay = protect_retry_delay_seconds(policy, index)
if delay > 0:
time.sleep(delay)
attempt_purpose = purpose if index == 0 else purpose + "-protect-retry"
result = probe_account(profile, config, attempt_purpose)
attempt = {
"attempt": index + 1,
"delaySeconds": delay,
"ok": result.get("ok"),
"markerMatched": result.get("markerMatched"),
"httpStatus": result.get("httpStatus"),
"durationMs": result.get("durationMs"),
"failureKind": result.get("failureKind"),
"outputHash": result.get("outputHash"),
"responseBodyHash": result.get("responseBodyHash"),
"errorDetails": result.get("errorDetails"),
}
attempts.append(attempt)
last_result = result
if result.get("markerMatched") is True:
result["sentinelProtect"] = {
"enabled": True,
"threshold": threshold,
"attempts": attempts,
"failureCount": index,
"protected": index > 0,
"decision": "pass",
}
if index > 0:
result["purpose"] = purpose + "-protect-recovered"
return result
if last_result is None:
last_result = probe_account(profile, config, purpose)
last_result["sentinelProtect"] = {
"enabled": True,
"threshold": threshold,
"attempts": attempts,
"failureCount": len(attempts),
"protected": False,
"decision": "fail",
}
last_result["purpose"] = purpose + "-protect-exhausted"
return last_result
def ledger_for(state, now):
day = day_key(now)
ledger = state.setdefault("ledger", {}).setdefault(day, {"inputTokens": 0, "outputTokens": 0, "totalTokens": 0, "estimatedCostUsd": 0, "requestCount": 0})
return day, ledger
def account_day(account_state, day):
return account_state.setdefault("daily", {}).setdefault(day, {"inputTokens": 0, "outputTokens": 0, "totalTokens": 0, "estimatedCostUsd": 0, "requestCount": 0})
def runtime_temp_unschedulable_until(account):
if not isinstance(account, dict):
return None
return account.get("temp_unschedulable_until") or account.get("tempUnschedulableUntil")
def runtime_temp_unschedulable_reason(account):
if not isinstance(account, dict):
return None
return account.get("temp_unschedulable_reason") or account.get("tempUnschedulableReason")
def runtime_recovery_due(account_state):
return account_state.get("runtimeSchedulable") is False
def sync_runtime_schedulable_state(state, profiles, now, admin):
accounts_state = state.setdefault("accounts", {})
synced_at = iso(now)
summary = {
"ok": False,
"syncedAt": synced_at,
"managedAccountCount": len(profiles),
"schedulableAccounts": [],
"unschedulableAccounts": [],
"missingAccounts": [],
}
try:
runtime_accounts = admin.accounts()
except Exception as exc:
error_text = str(exc)
summary["error"] = error_text
state["runtimeSchedulable"] = summary
for profile in profiles:
name = profile.get("accountName") if isinstance(profile, dict) else None
if isinstance(name, str) and name:
account_state = accounts_state.setdefault(name, {})
account_state["runtimeSchedulable"] = None
account_state["runtimeSyncedAt"] = synced_at
account_state["runtimeSyncError"] = error_text
return [{"type": "runtime-sync", "ok": False, "error": error_text}]
for profile in profiles:
name = profile.get("accountName") if isinstance(profile, dict) else None
if not isinstance(name, str) or not name:
continue
account_state = accounts_state.setdefault(name, {})
account = runtime_accounts.get(name)
if not isinstance(account, dict):
account_state["runtimeMissing"] = True
account_state["runtimeAccountId"] = None
account_state["runtimeStatus"] = None
account_state["runtimeSchedulable"] = None
account_state["runtimeTempUnschedulableUntil"] = None
account_state["runtimeTempUnschedulableSet"] = None
account_state["runtimeSyncedAt"] = synced_at
account_state.pop("runtimeSyncError", None)
summary["missingAccounts"].append(name)
continue
temp_until = runtime_temp_unschedulable_until(account)
temp_reason = runtime_temp_unschedulable_reason(account)
temp_set = temp_until is not None or bool(temp_reason)
schedulable = account.get("schedulable")
if not isinstance(schedulable, bool):
schedulable = None
account_state["runtimeMissing"] = False
account_state["runtimeAccountId"] = account.get("id")
account_state["runtimeStatus"] = account.get("status")
account_state["runtimeSchedulable"] = schedulable
account_state["runtimeTempUnschedulableUntil"] = temp_until
account_state["runtimeTempUnschedulableSet"] = temp_set
account_state["runtimeSyncedAt"] = synced_at
account_state.pop("runtimeSyncError", None)
if temp_reason:
account_state["runtimeTempUnschedulableReasonHash"] = sha(temp_reason)
account_state["runtimeTempUnschedulableReasonPreview"] = preview(temp_reason, 240)
else:
account_state.pop("runtimeTempUnschedulableReasonHash", None)
account_state.pop("runtimeTempUnschedulableReasonPreview", None)
if schedulable is True:
summary["schedulableAccounts"].append(name)
elif schedulable is False:
summary["unschedulableAccounts"].append({
"accountName": name,
"status": account.get("status"),
"tempUnschedulableSet": temp_set,
})
summary["ok"] = True
state["runtimeSchedulable"] = summary
return [{
"type": "runtime-sync",
"ok": True,
"schedulableCount": len(summary["schedulableAccounts"]),
"unschedulableCount": len(summary["unschedulableAccounts"]),
"missingCount": len(summary["missingAccounts"]),
}]
def add_usage(state, account_state, now, usage):
day, ledger = ledger_for(state, now)
daily = account_day(account_state, day)
for target in (ledger, daily):
target["inputTokens"] = int(target.get("inputTokens") or 0) + int(usage.get("inputTokens") or 0)
target["outputTokens"] = int(target.get("outputTokens") or 0) + int(usage.get("outputTokens") or 0)
target["totalTokens"] = int(target.get("totalTokens") or 0) + int(usage.get("totalTokens") or 0)
target["estimatedCostUsd"] = round(float(target.get("estimatedCostUsd") or 0) + float(usage.get("estimatedCostUsd") or 0), 8)
target["requestCount"] = int(target.get("requestCount") or 0) + 1
def due_time(account_state):
quarantine = account_state.get("quarantine")
if isinstance(quarantine, dict) and quarantine.get("active") is True:
return parse_iso(quarantine.get("until"))
if runtime_recovery_due(account_state):
return None
return parse_iso(account_state.get("nextProbeAfter"))
def choose_due_profiles(profiles, state, config, now):
day, ledger = ledger_for(state, now)
due = []
accounts = state.setdefault("accounts", {})
for profile in profiles:
name = profile["accountName"]
account_state = accounts.setdefault(name, {})
when = due_time(account_state)
if when is None or when <= now:
quarantine = account_state.get("quarantine")
active_quarantine = isinstance(quarantine, dict) and quarantine.get("active") is True
purpose = "recovery" if active_quarantine else "runtime-recovery" if runtime_recovery_due(account_state) else "health"
due.append({"profile": profile, "purpose": purpose, "dueAt": iso(when) if when else None})
due.sort(key=lambda item: item["dueAt"] or "")
return due, {"selected": len(due), "due": len(due), "limit": "all-due", "budgetMode": "record-only", "ledger": ledger}
def forced_account_names():
raw = os.environ.get("SENTINEL_ACCOUNT_NAMES") or ""
names = [item.strip() for item in raw.split(",") if item.strip()]
return set(names)
def choose_forced_profiles(profiles, state, config, now, names):
accounts = state.setdefault("accounts", {})
found = []
missing = sorted(names)
due = []
for profile in profiles:
name = profile["accountName"]
if name not in names:
continue
account_state = accounts.setdefault(name, {})
quarantine = account_state.get("quarantine")
active_quarantine = isinstance(quarantine, dict) and quarantine.get("active") is True
purpose = "manual-recovery" if active_quarantine else "manual-runtime-recovery" if runtime_recovery_due(account_state) else "manual-health"
due.append({"profile": profile, "purpose": purpose, "dueAt": "forced"})
found.append(name)
missing = sorted(name for name in names if name not in set(found))
return due, {"selected": len(due), "due": len(due), "limit": "forced-accounts", "budgetMode": "record-only", "ledger": ledger_for(state, now)[1], "requestedAccounts": sorted(names), "missingAccounts": missing}
def success_max_interval(profile, config):
cadence = config["cadence"]
if profile.get("trustUpstream") is True:
return int(cadence.get("trustedSuccessMaxIntervalMinutes") or cadence.get("successMaxIntervalMinutes"))
return int(cadence.get("untrustedSuccessMaxIntervalMinutes") or cadence.get("successMaxIntervalMinutes"))
def next_success_interval(account_state, config, profile):
streak = int(account_state.get("successStreak") or 0)
previous = int(account_state.get("successIntervalMinutes") or 0)
initial = int(config["cadence"]["successInitialIntervalMinutes"])
maximum = success_max_interval(profile, config)
multiplier = int(config["cadence"]["successBackoffMultiplier"])
return initial if streak <= 0 or previous <= 0 else min(maximum, max(initial, previous * multiplier))
def next_freeze_interval(account_state, config, was_recovery):
quarantine = account_state.get("quarantine") if isinstance(account_state.get("quarantine"), dict) else {}
previous = int(quarantine.get("intervalMinutes") or 0)
initial = int(config["freeze"]["initialTtlMinutes"])
maximum = int(config["freeze"]["maxTtlMinutes"])
multiplier = int(config["freeze"]["backoffMultiplier"])
if was_recovery and previous > 0:
return min(maximum, max(initial, previous * multiplier))
return initial
def apply_result(result, state, config, now, admin, profile):
name = result["accountName"]
account_state = state.setdefault("accounts", {}).setdefault(name, {})
add_usage(state, account_state, now, result.get("usage") or {})
actions_enabled = bool(config["actions"]["enabled"])
quarantine = account_state.get("quarantine") if isinstance(account_state.get("quarantine"), dict) else None
was_recovery = bool(quarantine and quarantine.get("active") is True)
was_runtime_recovery = runtime_recovery_due(account_state)
action = {"taken": False, "type": None}
if result.get("ok") is True:
quality_gate = account_state.get("qualityGate") if isinstance(account_state.get("qualityGate"), dict) else None
if was_recovery or was_runtime_recovery:
if actions_enabled:
try:
action_type = "restore" if was_recovery else "restore-runtime-unschedulable"
action = {"taken": True, "type": action_type, "result": admin.set_schedulable(name, True)}
account_state["runtimeSchedulable"] = True
account_state["runtimeSyncedAt"] = iso(now)
account_state.pop("runtimeSyncError", None)
except Exception as exc:
action = {"taken": False, "type": "restore-failed", "error": str(exc)}
if was_recovery:
account_state["quarantine"] = {"active": False, "clearedAt": iso(now), "lastApplied": quarantine.get("applied") is True}
account_state["successStreak"] = 0
account_state["successIntervalMinutes"] = 0
elif isinstance(quarantine, dict) and quarantine.get("active") is not True:
account_state["quarantine"] = {"active": False, "clearedAt": iso(now), "lastApplied": quarantine.get("applied") is True}
if quality_gate and quality_gate.get("pending") is True:
account_state["qualityGate"] = {**quality_gate, "pending": False, "clearedAt": iso(now)}
interval = next_success_interval(account_state, config, profile)
account_state["successStreak"] = int(account_state.get("successStreak") or 0) + 1
account_state["successIntervalMinutes"] = interval
account_state["successMaxIntervalMinutes"] = success_max_interval(profile, config)
account_state["nextProbeAfter"] = iso(add_minutes(now, interval, int(config["cadence"]["jitterPercent"])))
account_state["lastOkAt"] = iso(now)
account_state["lastStatus"] = "ok"
else:
should_freeze = result.get("markerMatched") is not True
if should_freeze:
interval = next_freeze_interval(account_state, config, was_recovery)
until = add_minutes(now, interval, int(config["freeze"]["jitterPercent"]))
applied = False
if actions_enabled:
try:
action = {"taken": True, "type": "freeze", "result": admin.set_schedulable(name, False)}
applied = True
account_state["runtimeSchedulable"] = False
account_state["runtimeSyncedAt"] = iso(now)
except Exception as exc:
action = {"taken": False, "type": "freeze-failed", "error": str(exc)}
else:
action = {"taken": False, "type": "would-freeze"}
account_state["quarantine"] = {
"active": True,
"applied": applied,
"until": iso(until),
"intervalMinutes": interval,
"reason": "marker-not-matched",
"failureKind": result.get("failureKind"),
"markerHash": result.get("markerHash"),
"outputHash": result.get("outputHash"),
"responseBodyHash": result.get("responseBodyHash"),
"errorDetails": result.get("errorDetails"),
"lastBadAt": iso(now),
}
account_state["nextProbeAfter"] = iso(until)
account_state["successStreak"] = 0
account_state["successIntervalMinutes"] = 0
account_state["successMaxIntervalMinutes"] = success_max_interval(profile, config)
account_state["lastStatus"] = "quarantined"
else:
retry = int(config["probe"]["transportRetryMinutes"])
account_state["nextProbeAfter"] = iso(add_minutes(now, retry, int(config["cadence"]["jitterPercent"])))
account_state["lastStatus"] = "marker-not-matched-no-freeze"
account_state["lastFailureAt"] = iso(now)
account_state["lastProbeAt"] = iso(now)
account_state["trustUpstream"] = profile.get("trustUpstream") is True
account_state["lastProbe"] = {
"ok": result.get("ok"),
"purpose": result.get("purpose"),
"trustUpstream": result.get("trustUpstream"),
"sentinelProtect": result.get("sentinelProtect"),
"successMaxIntervalMinutes": success_max_interval(profile, config),
"httpStatus": result.get("httpStatus"),
"durationMs": result.get("durationMs"),
"markerMatched": result.get("markerMatched"),
"transportOk": result.get("transportOk"),
"outputHash": result.get("outputHash"),
"outputPreview": result.get("outputPreview"),
"responseBodyHash": result.get("responseBodyHash"),
"responseBodyPreview": result.get("responseBodyPreview"),
"error": result.get("error"),
"errorDetails": result.get("errorDetails"),
"usage": result.get("usage"),
"failureKind": result.get("failureKind"),
"sdk": result.get("sdk"),
"requestShape": result.get("requestShape"),
"action": action,
}
return action
def log_line_payload(line):
pos = line.find("{")
if pos < 0:
return None, "", None
prefix = line[:pos].rstrip("\t ")
parts = prefix.split("\t")
ts = parts[0] if parts else ""
message = parts[3] if len(parts) >= 4 else ""
try:
return ts, message, json.loads(line[pos:])
except Exception:
return ts, message, None
def gateway_monitor_paths(config):
cfg = config.get("gatewayFailureMonitor") if isinstance(config.get("gatewayFailureMonitor"), dict) else {}
paths = cfg.get("paths")
if isinstance(paths, list) and paths:
return set(str(item) for item in paths if isinstance(item, str) and item)
return {"/responses", "/v1/responses", "/responses/compact", "/v1/responses/compact"}
def gateway_failure_kind(message, payload, config):
if not isinstance(payload, dict):
return None
path = payload.get("path")
if path not in gateway_monitor_paths(config):
return None
if payload.get("account_id") is None:
return None
if "codex.remote_compact.failed" in message:
status = payload.get("status_code")
if isinstance(status, int) and status >= 500:
return "gateway-compact-final-failure"
return None
if "openai.upstream_failover_switching" in message and path in ("/responses/compact", "/v1/responses/compact"):
upstream_status = payload.get("upstream_status")
if isinstance(upstream_status, int) and upstream_status >= 500:
return "gateway-compact-upstream-failover"
return None
if "openai.forward_failed" not in message:
return None
error_text = str(payload.get("error") or "").lower()
fallback_written = payload.get("fallback_error_response_written") is True
upstream_already_written = payload.get("upstream_error_response_already_written") is True
stream_failure = any(token in error_text for token in (
"stream usage incomplete",
"missing terminal event",
"stream read error",
"stream data interval timeout",
))
if fallback_written or upstream_already_written or stream_failure:
return "gateway-stream-forward-failure"
session_affinity_failure = any(token in error_text for token in (
"encrypted content could not be decrypted",
"could not be verified",
"invalid_encrypted_content",
))
if session_affinity_failure:
return "gateway-session-affinity-failure"
final_compatibility_failure = any(token in error_text for token in (
"bad_response_status_code",
"model_not_found",
"no available channel for model",
"unsupported model",
"not support",
"not supported",
"payload too large",
"request too large",
"context length",
"context window",
"maximum context",
))
if final_compatibility_failure:
return "gateway-final-compatibility-failure"
final_5xx_failure = any(token in error_text for token in (
"upstream error: 500",
"upstream error: 502",
"upstream error: 503",
"upstream error: 504",
"upstream error: 524",
"gateway timeout",
"bad gateway",
"upstream request failed",
"unknown error",
"context deadline exceeded",
"context canceled",
))
if final_5xx_failure:
return "gateway-final-transient-failure"
return None
def gateway_failure_is_observe_only(failure_kind):
return failure_kind in {"gateway-session-affinity-failure", "gateway-compact-final-failure", "gateway-compact-upstream-failover"}
def gateway_failure_item(ts, pod_name, payload, failure_kind):
request_id = payload.get("request_id") or sha(json.dumps(payload, sort_keys=True, ensure_ascii=False))
try:
account_id = int(payload.get("account_id"))
except Exception:
account_id = None
return {
"at": ts,
"pod": pod_name,
"requestId": request_id,
"clientRequestId": payload.get("client_request_id"),
"accountId": account_id,
"failureKind": failure_kind,
"path": payload.get("path"),
"errorPreview": preview(payload.get("error"), 240),
"fallbackErrorResponseWritten": payload.get("fallback_error_response_written") is True,
"upstreamErrorResponseAlreadyWritten": payload.get("upstream_error_response_already_written") is True,
"bodyBytes": payload.get("body_bytes"),
"latencyMs": payload.get("latency_ms"),
"statusCode": payload.get("status_code"),
"upstreamStatus": payload.get("upstream_status"),
}
def trim_gateway_seen(monitor_state, now, lookback_seconds):
seen = monitor_state.setdefault("seenRequestIds", {})
if not isinstance(seen, dict):
seen = {}
monitor_state["seenRequestIds"] = seen
cutoff = now - timedelta(seconds=max(int(lookback_seconds) * 4, 3600))
for request_id, seen_at in list(seen.items()):
parsed = parse_iso(seen_at)
if parsed is None or parsed < cutoff:
seen.pop(request_id, None)
return seen
def gateway_failure_account_map(admin):
by_id = {}
for name, account in admin.accounts().items():
try:
account_id = int(account.get("id"))
except Exception:
continue
by_id[account_id] = name
return by_id
def next_gateway_freeze_interval(account_state, config):
monitor_cfg = config.get("gatewayFailureMonitor") if isinstance(config.get("gatewayFailureMonitor"), dict) else {}
initial = int(monitor_cfg.get("initialTtlMinutes") or 5)
maximum = int(monitor_cfg.get("maxTtlMinutes") or 30)
multiplier = int(monitor_cfg.get("backoffMultiplier") or 2)
previous = int(account_state.get("gatewayFailureBackoffIntervalMinutes") or 0)
quarantine = account_state.get("quarantine") if isinstance(account_state.get("quarantine"), dict) else {}
if quarantine.get("reason") == "gateway-forward-failure":
previous = max(previous, int(quarantine.get("intervalMinutes") or 0))
if previous <= 0:
return initial
return min(maximum, max(initial, previous * multiplier))
def apply_gateway_failure(account_name, failures, state, config, now, admin, profile):
latest = failures[-1]
account_state = state.setdefault("accounts", {}).setdefault(account_name, {})
policy = protect_policy(profile)
protected_probe = None
if policy is not None:
protected_probe = probe_account_with_protection(profile, config, "gateway-failure-confirm")
protected_probe["sourceGatewayFailure"] = {
"requestId": latest.get("requestId"),
"clientRequestId": latest.get("clientRequestId"),
"failureKind": latest.get("failureKind"),
"path": latest.get("path"),
"countInRun": len(failures),
}
account_state["lastProbeAt"] = iso(now)
account_state["lastProbe"] = {
"ok": protected_probe.get("ok"),
"purpose": protected_probe.get("purpose"),
"trustUpstream": protected_probe.get("trustUpstream"),
"sentinelProtect": protected_probe.get("sentinelProtect"),
"successMaxIntervalMinutes": success_max_interval(profile, config),
"httpStatus": protected_probe.get("httpStatus"),
"durationMs": protected_probe.get("durationMs"),
"markerMatched": protected_probe.get("markerMatched"),
"transportOk": protected_probe.get("transportOk"),
"outputHash": protected_probe.get("outputHash"),
"outputPreview": protected_probe.get("outputPreview"),
"responseBodyHash": protected_probe.get("responseBodyHash"),
"responseBodyPreview": protected_probe.get("responseBodyPreview"),
"error": protected_probe.get("error"),
"errorDetails": protected_probe.get("errorDetails"),
"usage": protected_probe.get("usage"),
"failureKind": protected_probe.get("failureKind"),
"sdk": protected_probe.get("sdk"),
"requestShape": protected_probe.get("requestShape"),
"action": {"taken": False, "type": "protect-confirm-pass" if protected_probe.get("markerMatched") is True else "protect-confirm-fail"},
"sourceGatewayFailure": protected_probe.get("sourceGatewayFailure"),
}
add_usage(state, account_state, now, protected_probe.get("usage") or {})
account_state["sentinelProtect"] = protected_probe.get("sentinelProtect")
account_state["trustUpstream"] = profile.get("trustUpstream") is True
account_state["successMaxIntervalMinutes"] = success_max_interval(profile, config)
if protected_probe.get("markerMatched") is True:
interval = next_success_interval(account_state, config, profile)
account_state["successStreak"] = int(account_state.get("successStreak") or 0) + 1
account_state["successIntervalMinutes"] = interval
account_state["nextProbeAfter"] = iso(add_minutes(now, interval, int(config["cadence"]["jitterPercent"])))
account_state["lastOkAt"] = iso(now)
account_state["lastStatus"] = "gateway-failure-protect-confirmed-ok"
account_state["lastGatewayFailureAt"] = iso(now)
account_state["lastGatewayFailure"] = {
"accountName": account_name,
"accountId": latest.get("accountId"),
"requestId": latest.get("requestId"),
"clientRequestId": latest.get("clientRequestId"),
"failureKind": latest.get("failureKind"),
"path": latest.get("path"),
"errorPreview": latest.get("errorPreview"),
"countInRun": len(failures),
"firstAt": failures[0].get("at"),
"lastAt": latest.get("at"),
"action": {"taken": False, "type": "protect-confirm-pass"},
"sentinelProtect": protected_probe.get("sentinelProtect"),
}
return {"taken": False, "type": "protect-confirm-pass", "sentinelProtect": protected_probe.get("sentinelProtect")}
interval = next_gateway_freeze_interval(account_state, config)
until = add_minutes(now, interval, int(config["freeze"]["jitterPercent"]))
actions_enabled = bool(config["actions"]["enabled"])
applied = False
action = {"taken": False, "type": "would-freeze"}
if actions_enabled:
try:
action = {"taken": True, "type": "freeze", "result": admin.set_schedulable(account_name, False)}
applied = True
except Exception as exc:
action = {"taken": False, "type": "freeze-failed", "error": str(exc)}
account_state["quarantine"] = {
"active": True,
"applied": applied,
"until": iso(until),
"intervalMinutes": interval,
"reason": "gateway-forward-failure",
"failureKind": latest.get("failureKind") or "gateway-forward-failure",
"errorDetails": {
"kind": "Sub2APIGatewayForwardFailure",
"requestId": latest.get("requestId"),
"clientRequestId": latest.get("clientRequestId"),
"failureKind": latest.get("failureKind"),
"path": latest.get("path"),
"errorPreview": latest.get("errorPreview"),
"fallbackErrorResponseWritten": latest.get("fallbackErrorResponseWritten"),
"upstreamErrorResponseAlreadyWritten": latest.get("upstreamErrorResponseAlreadyWritten"),
"bodyBytes": latest.get("bodyBytes"),
"latencyMs": latest.get("latencyMs"),
"countInRun": len(failures),
},
"lastBadAt": iso(now),
"sentinelProtect": protected_probe.get("sentinelProtect") if isinstance(protected_probe, dict) else None,
}
account_state["nextProbeAfter"] = iso(until)
account_state["successStreak"] = 0
account_state["successIntervalMinutes"] = 0
account_state["successMaxIntervalMinutes"] = success_max_interval(profile, config)
account_state["lastStatus"] = "quarantined"
account_state["lastFailureAt"] = iso(now)
account_state["lastGatewayFailureAt"] = iso(now)
account_state["gatewayFailureBackoffIntervalMinutes"] = interval
account_state["lastGatewayFailure"] = {
"accountName": account_name,
"accountId": latest.get("accountId"),
"requestId": latest.get("requestId"),
"clientRequestId": latest.get("clientRequestId"),
"failureKind": latest.get("failureKind"),
"path": latest.get("path"),
"errorPreview": latest.get("errorPreview"),
"fallbackErrorResponseWritten": latest.get("fallbackErrorResponseWritten"),
"upstreamErrorResponseAlreadyWritten": latest.get("upstreamErrorResponseAlreadyWritten"),
"bodyBytes": latest.get("bodyBytes"),
"latencyMs": latest.get("latencyMs"),
"countInRun": len(failures),
"firstAt": failures[0].get("at"),
"lastAt": latest.get("at"),
"intervalMinutes": interval,
"freezeUntil": iso(until),
"action": action,
"sentinelProtect": protected_probe.get("sentinelProtect") if isinstance(protected_probe, dict) else None,
}
return action
def record_gateway_observation(account_name, failures, state, now):
latest = failures[-1]
account_state = state.setdefault("accounts", {}).setdefault(account_name, {})
account_state["lastGatewayAffinityFailureAt"] = iso(now)
account_state["lastGatewayAffinityFailure"] = {
"accountName": account_name,
"accountId": latest.get("accountId"),
"requestId": latest.get("requestId"),
"clientRequestId": latest.get("clientRequestId"),
"failureKind": latest.get("failureKind"),
"path": latest.get("path"),
"errorPreview": latest.get("errorPreview"),
"bodyBytes": latest.get("bodyBytes"),
"latencyMs": latest.get("latencyMs"),
"countInRun": len(failures),
"firstAt": failures[0].get("at"),
"lastAt": latest.get("at"),
"action": {
"taken": False,
"type": "observe-session-affinity-failure",
},
}
return {"taken": False, "type": "observe-session-affinity-failure"}
def run_gateway_failure_monitor(state, config, now, kube, admin, profiles):
cfg = config.get("gatewayFailureMonitor") if isinstance(config.get("gatewayFailureMonitor"), dict) else {}
if cfg.get("enabled") is not True:
return {"enabled": False, "scanned": 0, "newFailures": 0, "actions": []}
lookback_seconds = int(cfg.get("lookbackSeconds") or 900)
tail_lines = int(cfg.get("tailLines") or 4000)
monitor_state = state.setdefault("gatewayFailureMonitor", {})
if not isinstance(monitor_state, dict):
monitor_state = {}
state["gatewayFailureMonitor"] = monitor_state
seen = trim_gateway_seen(monitor_state, now, lookback_seconds)
pods = kube.list_pods("app.kubernetes.io/name=sub2api")
candidates = []
log_errors = []
for pod in pods:
metadata = pod.get("metadata") if isinstance(pod, dict) else {}
status = pod.get("status") if isinstance(pod, dict) else {}
pod_name = metadata.get("name")
if not isinstance(pod_name, str) or not pod_name:
continue
if status.get("phase") not in (None, "Running"):
continue
try:
logs = kube.pod_logs(pod_name, "sub2api", lookback_seconds, tail_lines)
except Exception as exc:
log_errors.append({"pod": pod_name, "error": str(exc)})
continue
for line in str(logs).splitlines():
ts, message, payload = log_line_payload(line)
failure_kind = gateway_failure_kind(message, payload, config)
if failure_kind is not None:
candidates.append(gateway_failure_item(ts, pod_name, payload, failure_kind))
by_id = gateway_failure_account_map(admin) if candidates else {}
profile_by_name = {item.get("accountName"): item for item in profiles if isinstance(item, dict) and isinstance(item.get("accountName"), str)}
by_account = {}
skipped = []
new_failures = []
for item in candidates:
request_id = item.get("requestId")
if not isinstance(request_id, str) or not request_id:
request_id = sha(json.dumps(item, sort_keys=True, ensure_ascii=False))
item["requestId"] = request_id
if request_id in seen:
continue
seen[request_id] = iso(now)
account_name = by_id.get(item.get("accountId"))
if not account_name:
skipped.append({"requestId": request_id, "accountId": item.get("accountId"), "reason": "account-id-not-managed"})
continue
if account_name not in profile_by_name:
skipped.append({"requestId": request_id, "accountId": item.get("accountId"), "accountName": account_name, "reason": "account-not-in-profiles"})
continue
item["accountName"] = account_name
by_account.setdefault(account_name, []).append(item)
new_failures.append(item)
actions = []
for account_name, failures in sorted(by_account.items()):
failures.sort(key=lambda item: item.get("at") or "")
profile = profile_by_name[account_name]
freeze_failures = [item for item in failures if not gateway_failure_is_observe_only(item.get("failureKind"))]
observed_failures = [item for item in failures if gateway_failure_is_observe_only(item.get("failureKind"))]
if freeze_failures:
action = apply_gateway_failure(account_name, freeze_failures, state, config, now, admin, profile)
actions.append({
"accountName": account_name,
"accountId": freeze_failures[-1].get("accountId"),
"failureCount": len(freeze_failures),
"requestId": freeze_failures[-1].get("requestId"),
"failureKind": freeze_failures[-1].get("failureKind"),
"path": freeze_failures[-1].get("path"),
"errorPreview": freeze_failures[-1].get("errorPreview"),
"taken": action.get("taken"),
"type": action.get("type"),
"error": action.get("error"),
})
if observed_failures:
action = record_gateway_observation(account_name, observed_failures, state, now)
actions.append({
"accountName": account_name,
"accountId": observed_failures[-1].get("accountId"),
"failureCount": len(observed_failures),
"requestId": observed_failures[-1].get("requestId"),
"failureKind": observed_failures[-1].get("failureKind"),
"path": observed_failures[-1].get("path"),
"errorPreview": observed_failures[-1].get("errorPreview"),
"taken": action.get("taken"),
"type": action.get("type"),
"error": action.get("error"),
})
monitor_state["lastRunAt"] = iso(now)
monitor_state["lastScannedPods"] = [((pod.get("metadata") or {}).get("name")) for pod in pods if isinstance(pod, dict)]
monitor_state["lastCandidateCount"] = len(candidates)
monitor_state["lastNewFailureCount"] = len(new_failures)
monitor_state["lastActionCount"] = len(actions)
monitor_state["lastFailures"] = new_failures[-20:]
monitor_state["lastSkipped"] = skipped[-20:]
monitor_state["lastLogErrors"] = log_errors[-10:]
return {
"enabled": True,
"lookbackSeconds": lookback_seconds,
"tailLines": tail_lines,
"scannedPods": len(pods),
"candidates": len(candidates),
"newFailures": len(new_failures),
"actionsTaken": sum(1 for item in actions if item.get("taken") is True),
"actions": actions[-20:],
"skipped": skipped[-20:],
"logErrors": log_errors[-10:],
}
def reconcile_active_quarantines(state, config, now, admin):
actions = []
if not config["actions"]["enabled"]:
return actions
for name, account_state in state.setdefault("accounts", {}).items():
quarantine = account_state.get("quarantine")
if not isinstance(quarantine, dict) or quarantine.get("active") is not True:
continue
until = parse_iso(quarantine.get("until"))
if until is not None and until <= now:
continue
if quarantine.get("applied") is not True:
try:
admin.set_schedulable(name, False)
quarantine["applied"] = True
quarantine["appliedAt"] = iso(now)
actions.append({"accountName": name, "type": "apply-pending-freeze", "ok": True})
except Exception as exc:
actions.append({"accountName": name, "type": "apply-pending-freeze", "ok": False, "error": str(exc)})
continue
try:
admin.set_schedulable(name, False)
actions.append({"accountName": name, "type": "reassert-freeze", "ok": True})
except Exception as exc:
actions.append({"accountName": name, "type": "reassert-freeze", "ok": False, "error": str(exc)})
return actions
def main():
now = utc_now()
config = load_json(CONFIG_PATH)
profiles = load_json(PROFILES_PATH).get("profiles") or []
namespace = os.environ.get("POD_NAMESPACE") or "platform-infra"
kube = KubeClient(namespace)
state_obj, state = load_state(kube, config)
admin = Sub2ApiAdmin(config)
runtime_sync = sync_runtime_schedulable_state(state, profiles, now, admin)
reconcile = runtime_sync + reconcile_active_quarantines(state, config, now, admin)
forced_names = forced_account_names()
gateway_monitor = {"enabled": False, "skipped": "forced-manual-probe"} if forced_names else run_gateway_failure_monitor(state, config, now, kube, admin, profiles)
if forced_names:
due, selection = choose_forced_profiles(profiles, state, config, now, forced_names)
else:
due, selection = choose_due_profiles(profiles, state, config, now)
results = []
actions = []
if (config["monitor"]["enabled"] or forced_names) and due:
with ThreadPoolExecutor(max_workers=max(1, len(due))) as executor:
futures = {executor.submit(probe_account_with_protection, item["profile"], config, item["purpose"]): item["profile"] for item in due}
for future in as_completed(futures):
result = future.result()
results.append(result)
profile = futures[future]
actions.append({"accountName": result["accountName"], **apply_result(result, state, config, now, admin, profile)})
history = state.setdefault("history", [])
run_summary = {
"at": iso(now),
"monitorEnabled": bool(config["monitor"]["enabled"]),
"actionsEnabled": bool(config["actions"]["enabled"]),
"profileCount": len(profiles),
"selected": len(due),
"okCount": sum(1 for item in results if item.get("ok") is True),
"mismatchCount": sum(1 for item in results if item.get("markerMatched") is not True),
"markerMismatchCount": sum(1 for item in results if item.get("markerMatched") is not True),
"transportFailureCount": sum(1 for item in results if item.get("transportFailure") is True),
"actionsTaken": sum(1 for item in actions if item.get("taken") is True),
"gatewayFailureMonitor": {
"enabled": gateway_monitor.get("enabled") is True,
"newFailures": gateway_monitor.get("newFailures", 0),
"actionsTaken": gateway_monitor.get("actionsTaken", 0),
"skipped": gateway_monitor.get("skipped"),
"logErrors": gateway_monitor.get("logErrors"),
},
"runtimeSchedulable": state.get("runtimeSchedulable"),
"selection": selection,
"reconcile": reconcile[-20:],
}
history.append(run_summary)
del history[:-int(config["state"]["historyLimit"])]
state["lastRun"] = run_summary
kube.update_configmap_state(state_obj, state)
print(json.dumps({
"ok": True,
"summary": run_summary,
"results": [{
"accountName": item.get("accountName"),
"purpose": item.get("purpose"),
"trustUpstream": item.get("trustUpstream"),
"sentinelProtect": item.get("sentinelProtect"),
"ok": item.get("ok"),
"markerMatched": item.get("markerMatched"),
"httpStatus": item.get("httpStatus"),
"durationMs": item.get("durationMs"),
"usage": item.get("usage"),
"outputHash": item.get("outputHash"),
"outputPreview": item.get("outputPreview"),
"responseBodyHash": item.get("responseBodyHash"),
"responseBodyPreview": item.get("responseBodyPreview"),
"error": item.get("error"),
"errorDetails": item.get("errorDetails"),
"failureKind": item.get("failureKind"),
"sdk": item.get("sdk"),
"requestShape": item.get("requestShape"),
} for item in results],
"actions": actions,
"gatewayFailureMonitor": gateway_monitor,
"valuesPrinted": False,
}, ensure_ascii=False))
if __name__ == "__main__":
try:
main()
except Exception as exc:
print(json.dumps({
"ok": False,
"error": str(exc),
"traceback": traceback.format_exc()[-4000:],
"valuesPrinted": False,
}, ensure_ascii=False))
raise
`;
}
function indentBlock(value: string, spaces: number): string {
const prefix = " ".repeat(spaces);
return value.split("\n").map((line) => `${prefix}${line}`).join("\n");
}
function valueAt(value: Record<string, unknown>, key: string): unknown {
return Object.prototype.hasOwnProperty.call(value, key) ? value[key] : undefined;
}
function readRequiredRecord(value: unknown, key: string): Record<string, unknown> {
if (!isRecord(value)) throw new Error(`${key} must be a YAML object`);
return value;
}
function readRequiredBoolean(value: unknown, key: string): boolean {
if (value === undefined || value === null) throw new Error(`${key} is required`);
if (typeof value === "boolean") return value;
if (typeof value === "string" && value.trim() === "true") return true;
if (typeof value === "string" && value.trim() === "false") return false;
throw new Error(`${key} must be a boolean`);
}
function readRequiredDnsName(value: unknown, key: string): string {
const text = readRequiredString(value, key);
if (!/^[a-z0-9]([-a-z0-9]*[a-z0-9])?$/u.test(text)) throw new Error(`${key} must be a Kubernetes DNS label`);
return text;
}
function readRequiredModelName(value: unknown, key: string): string {
const text = readRequiredString(value, key);
if (!/^[A-Za-z0-9._:-]+$/u.test(text)) throw new Error(`${key} has an unsupported model name`);
return text;
}
function readRequiredEndpoint(value: unknown, key: string): "responses" {
const text = readRequiredString(value, key);
if (text !== "responses") throw new Error(`${key} must be responses`);
return text;
}
function readRequiredImage(value: unknown, key: string): string {
const text = readRequiredString(value, key);
if (!/^[A-Za-z0-9._:/@-]+$/u.test(text)) throw new Error(`${key} has an unsupported image format`);
return text;
}
function readRequiredOpenAiPythonVersion(value: unknown, key: string): string {
const text = readRequiredString(value, key);
if (!/^[0-9]+[.][0-9]+[.][0-9]+$/u.test(text)) throw new Error(`${key} must be a pinned semver version like 2.41.1`);
return text;
}
function readRequiredString(value: unknown, key: string): string {
if (value === undefined || value === null) throw new Error(`${key} is required`);
if (typeof value !== "string" || value.trim().length === 0) throw new Error(`${key} must be a non-empty string`);
if (/[\r\n]/u.test(value)) throw new Error(`${key} must not contain newlines`);
return value.trim();
}
function readRequiredMarkerPrefix(value: unknown, key: string): string {
const text = readRequiredString(value, key);
if (!/^[A-Za-z0-9_-]{2,32}$/u.test(text)) throw new Error(`${key} must be 2-32 chars of letters, digits, _ or -`);
return text;
}
function readRequiredUserAgent(value: unknown, key: string): string {
const text = readRequiredString(value, key);
if (/[\r\n]/u.test(text)) throw new Error(`${key} must not contain newlines`);
if (Buffer.byteLength(text, "utf8") > 200) throw new Error(`${key} must be at most 200 bytes`);
return text;
}
function readRequiredPathList(value: unknown, key: string): string[] {
if (value === undefined || value === null) throw new Error(`${key} is required`);
if (!Array.isArray(value) || value.length === 0) throw new Error(`${key} must be a non-empty string array`);
const paths = value.map((item, index) => {
if (typeof item !== "string" || item.trim().length === 0) throw new Error(`${key}[${index}] must be a non-empty string`);
const path = item.trim();
if (!/^\/[A-Za-z0-9._~!$&'()*+,;=:@/-]*$/u.test(path)) throw new Error(`${key}[${index}] has an unsupported HTTP path`);
return path;
});
return [...new Set(paths)];
}
function readRequiredInt(value: unknown, key: string, min: number, max: number): number {
if (value === undefined || value === null) throw new Error(`${key} is required`);
const parsed = typeof value === "number" ? value : typeof value === "string" && value.trim() ? Number(value) : Number.NaN;
if (!Number.isInteger(parsed) || parsed < min || parsed > max) throw new Error(`${key} must be an integer from ${min} to ${max}`);
return parsed;
}
function readRequiredNumber(value: unknown, key: string, min: number, max: number): number {
if (value === undefined || value === null) throw new Error(`${key} is required`);
const parsed = typeof value === "number" ? value : typeof value === "string" && value.trim() ? Number(value) : Number.NaN;
if (!Number.isFinite(parsed) || parsed < min || parsed > max) throw new Error(`${key} must be a number from ${min} to ${max}`);
return parsed;
}
function isRecord(value: unknown): value is Record<string, unknown> {
return typeof value === "object" && value !== null && !Array.isArray(value);
}