fix: verify sub2api temp unschedulable routing

This commit is contained in:
Codex
2026-06-09 08:24:27 +00:00
parent b55320c185
commit 33e0abb142
4 changed files with 223 additions and 29 deletions
+3
View File
@@ -27,6 +27,7 @@
- `pool.apiKeySecretName` and `pool.apiKeySecretKey` name the k3s Secret that stores the single consumer API key.
- `pool.defaultTempUnschedulable` declares Sub2API account-level temporary unschedulable rules. Keep 429/overload/capacity failures in this YAML policy so the scheduler can cool down a failing account and choose another candidate instead of hard-pinning one provider.
- `profiles.entries` selects local Codex profile files from `~/.codex/` and maps them to Sub2API account names.
- `profiles.entries[].tempUnschedulable` may override the pool default for one account. The CLI renders it into Sub2API credentials as `temp_unschedulable_enabled` and `temp_unschedulable_rules`; rules match HTTP status plus response-body keywords and place only that account into a temporary unschedulable cooldown.
- `profiles.entries[].openaiResponsesWebSocketsV2Mode` is the account-level Responses WebSocket v2 switch for OpenAI-compatible upstreams that require WebSocket transport. Allowed values are `off`, `ctx_pool`, and `passthrough`; omit the field unless that upstream needs it.
- `profiles.entries[].upstreamUserAgent` is an optional account-level upstream request User-Agent override. Use it only for upstreams that require a Codex CLI compatible User-Agent; keep the value YAML-controlled and newline-free.
- `publicExposure` controls the optional FRP bridge from master server to the G14 ClusterIP service.
@@ -34,6 +35,8 @@
Enable account-level WebSocket v2 only for upstream profiles that have passed a direct Codex WSv2 probe. Treat this as a YAML-declared capability set, not a hard scheduling pin to one profile; `codex-pool validate` must show at least one current `webSocketsV2.schedulableEnabled` account, and runtime smoke remains the availability proof. The same validation reports each managed account's runtime WebSocket v2 mode and whether it matches YAML, so stale `ctx_pool` settings cannot silently keep routing Codex WS sessions to an upstream that closes with `no available account`.
Do not enable Sub2API `pool_mode` for UniDesk-managed Codex accounts. `pool_mode` retries the same selected account path, while UniDesk's desired failover behavior is to mark the failing account temporarily unschedulable and let Sub2API choose another account from the group. `codex-pool validate` reports each managed account's temporary-unschedulable runtime alignment and should be used after `codex-pool sync --confirm`.
The request path is:
1. A client sends an OpenAI-compatible request to the configured consumer base URL, normally master-local `http://127.0.0.1:<frp-port>/v1/...`, with the unified API key.
@@ -0,0 +1,52 @@
import { defaultCodexTempUnschedulablePolicy, renderSub2ApiTempUnschedulableCredentials } from "./src/platform-infra-sub2api-codex";
function assertCondition(condition: unknown, message: string, detail: unknown = {}): void {
if (!condition) throw new Error(`${message}: ${JSON.stringify(detail)}`);
}
const policy = defaultCodexTempUnschedulablePolicy();
const credentials = renderSub2ApiTempUnschedulableCredentials(policy) as {
temp_unschedulable_enabled?: boolean;
temp_unschedulable_rules?: Array<{
error_code?: number;
keywords?: string[];
duration_minutes?: number;
description?: string;
}>;
pool_mode?: unknown;
};
const rules = credentials.temp_unschedulable_rules ?? [];
const overload429 = rules.find((rule) => rule.error_code === 429);
const overloaded529 = rules.find((rule) => rule.error_code === 529);
assertCondition(credentials.temp_unschedulable_enabled === true, "default policy must enable Sub2API temporary unschedulable mode", credentials);
assertCondition(Array.isArray(credentials.temp_unschedulable_rules), "Sub2API rules must be rendered as an array", credentials);
assertCondition(rules.length === policy.rules.length, "rendered rule count must match the UniDesk policy", { policy, credentials });
assertCondition(overload429 !== undefined, "rendered policy must include 429 handling", rules);
assertCondition(overload429?.keywords?.includes("capacity"), "429 handling must catch capacity text", overload429);
assertCondition(overload429?.keywords?.includes("too many requests"), "429 handling must catch rate-limit text", overload429);
assertCondition(overload429?.duration_minutes === 10, "429 cooldown must match the default policy", overload429);
assertCondition(overloaded529 !== undefined, "rendered policy must include provider overloaded status 529", rules);
assertCondition(rules.every((rule) => typeof rule.error_code === "number"), "rules must use Sub2API error_code field names", rules);
assertCondition(rules.every((rule) => typeof rule.duration_minutes === "number"), "rules must use Sub2API duration_minutes field names", rules);
assertCondition(rules.every((rule) => Array.isArray(rule.keywords) && rule.keywords.length > 0), "rules must include non-empty keyword lists", rules);
assertCondition(!("pool_mode" in credentials), "pool_mode must not be enabled because it retries the same account instead of cooling it down", credentials);
assertCondition(!("api_key" in credentials) && !("base_url" in credentials), "temporary-unschedulable rendering must not include secrets or endpoints", credentials);
const disabled = renderSub2ApiTempUnschedulableCredentials({ enabled: false, rules: policy.rules }) as {
temp_unschedulable_enabled?: boolean;
temp_unschedulable_rules?: unknown[];
};
assertCondition(disabled.temp_unschedulable_enabled === false, "disabled policy must explicitly disable Sub2API temporary unschedulable mode", disabled);
assertCondition(Array.isArray(disabled.temp_unschedulable_rules) && disabled.temp_unschedulable_rules.length === 0, "disabled policy must not leave stale rules active", disabled);
console.log(JSON.stringify({
ok: true,
checks: [
"temporary unschedulable policy renders to Sub2API credential field names",
"capacity/rate-limit errors cool down an account without enabling pool_mode",
"disabled policies clear runtime rules",
],
}));
+3
View File
@@ -17,6 +17,7 @@ const syntaxFiles = [
"scripts/playwright-cli-contract-test.ts",
"scripts/platform-infra-sub2api-codex-local-config-contract-test.ts",
"scripts/platform-infra-sub2api-codex-routing-contract-test.ts",
"scripts/platform-infra-sub2api-codex-temp-unsched-contract-test.ts",
"scripts/src/playwright-cli.ts",
"scripts/src/check.ts",
"scripts/src/artifact-registry.ts",
@@ -386,6 +387,7 @@ export function runChecks(config: UniDeskConfig, options: CheckOptions = default
fileItem("scripts/playwright-cli-contract-test.ts"),
fileItem("scripts/platform-infra-sub2api-codex-local-config-contract-test.ts"),
fileItem("scripts/platform-infra-sub2api-codex-routing-contract-test.ts"),
fileItem("scripts/platform-infra-sub2api-codex-temp-unsched-contract-test.ts"),
fileItem("scripts/code-queue-pr-preflight-example.ts"),
fileItem("scripts/schedule-cli-contract-test.ts"),
fileItem("scripts/server-cleanup-plan-contract-test.ts"),
@@ -452,6 +454,7 @@ export function runChecks(config: UniDeskConfig, options: CheckOptions = default
items.push(commandItem("playwright:cli-wrapper-contract", ["bun", "scripts/playwright-cli-contract-test.ts"], 30_000));
items.push(commandItem("platform-infra:sub2api-codex-local-config-contract", ["bun", "scripts/platform-infra-sub2api-codex-local-config-contract-test.ts"], 30_000));
items.push(commandItem("platform-infra:sub2api-codex-routing-contract", ["bun", "scripts/platform-infra-sub2api-codex-routing-contract-test.ts"], 30_000));
items.push(commandItem("platform-infra:sub2api-codex-temp-unsched-contract", ["bun", "scripts/platform-infra-sub2api-codex-temp-unsched-contract-test.ts"], 30_000));
items.push(commandItem("auth-broker:p0-contract", ["bun", "scripts/auth-broker-contract-test.ts"], 30_000));
items.push(commandItem("d601:recovery-guardrails-contract", ["bun", "scripts/d601-recovery-guardrails-contract-test.ts"], 30_000));
items.push(commandItem("hwlab:cd-wrapper-contract", ["bun", "scripts/hwlab-cd-wrapper-contract-test.ts"], 30_000));
+165 -29
View File
@@ -281,6 +281,7 @@ async function codexPoolSync(config: UniDeskConfig, options: SyncOptions): Promi
priority: profile.priority,
capacity: profile.capacity,
tempUnschedulable: profile.tempUnschedulable,
tempUnschedulableCredentials: renderSub2ApiTempUnschedulableCredentials(profile.tempUnschedulable),
})),
};
const result = await capture(config, g14K3sRoute, ["script"], syncScript(payload, pool));
@@ -444,7 +445,7 @@ function collectCodexProfiles(): CodexProfile[] {
const pool = readCodexPoolConfig();
if (!existsSync(codexDir)) return [];
const seenAccountNames = new Set<string>();
const configs = pool.profiles.length > 0 ? pool.profiles : discoverCodexProfileConfigs(codexDir);
const configs = pool.profiles.length > 0 ? pool.profiles : discoverCodexProfileConfigs(codexDir, pool.defaultTempUnschedulable);
return configs.map((entry) => {
const resolved = resolveProfileFiles(codexDir, entry);
const profile = entry.profile;
@@ -942,6 +943,7 @@ function redactProfile(profile: CodexProfile): Record<string, unknown> {
upstreamUserAgent: profile.upstreamUserAgent,
priority: profile.priority,
capacity: profile.capacity,
tempUnschedulable: tempUnschedulableSummary(profile.tempUnschedulable),
apiKeyPresent: profile.apiKey !== null && profile.apiKey.length > 0,
apiKeyBytes: profile.apiKey === null ? 0 : Buffer.byteLength(profile.apiKey, "utf8"),
apiKeyFingerprint: profile.apiKey === null ? null : fingerprint(profile.apiKey),
@@ -952,6 +954,28 @@ function redactProfile(profile: CodexProfile): Record<string, unknown> {
};
}
export function renderSub2ApiTempUnschedulableCredentials(policy: CodexTempUnschedulablePolicy): Record<string, unknown> {
return {
temp_unschedulable_enabled: policy.enabled && policy.rules.length > 0,
temp_unschedulable_rules: policy.enabled
? policy.rules.map((rule) => ({
error_code: rule.statusCode,
keywords: [...rule.keywords],
duration_minutes: rule.durationMinutes,
description: rule.description ?? "",
}))
: [],
};
}
function tempUnschedulableSummary(policy: CodexTempUnschedulablePolicy): Record<string, unknown> {
return {
enabled: policy.enabled && policy.rules.length > 0,
ruleCount: policy.enabled ? policy.rules.length : 0,
statusCodes: policy.enabled ? policy.rules.map((rule) => rule.statusCode) : [],
};
}
function poolTarget(pool = readCodexPoolConfig()): Record<string, unknown> {
return {
route: g14K3sRoute,
@@ -1615,7 +1639,7 @@ function validateScript(pool: CodexPoolConfig): string {
function desiredAccountCapacityMap(pool: CodexPoolConfig): Record<string, number> {
const codexDir = join(homedir(), ".codex");
const seenAccountNames = new Set<string>();
const configs = pool.profiles.length > 0 ? pool.profiles : discoverCodexProfileConfigs(codexDir);
const configs = pool.profiles.length > 0 ? pool.profiles : discoverCodexProfileConfigs(codexDir, pool.defaultTempUnschedulable);
const capacities: Record<string, number> = {};
for (const entry of configs) {
const accountName = entry.accountName ?? uniqueAccountName(entry.profile, seenAccountNames);
@@ -1628,7 +1652,7 @@ function desiredAccountCapacityMap(pool: CodexPoolConfig): Record<string, number
function desiredAccountWebSocketsV2ModeMap(pool: CodexPoolConfig): Record<string, OpenAIResponsesWebSocketsV2Mode | null> {
const codexDir = join(homedir(), ".codex");
const seenAccountNames = new Set<string>();
const configs = pool.profiles.length > 0 ? pool.profiles : discoverCodexProfileConfigs(codexDir);
const configs = pool.profiles.length > 0 ? pool.profiles : discoverCodexProfileConfigs(codexDir, pool.defaultTempUnschedulable);
const modes: Record<string, OpenAIResponsesWebSocketsV2Mode | null> = {};
for (const entry of configs) {
const accountName = entry.accountName ?? uniqueAccountName(entry.profile, seenAccountNames);
@@ -1638,6 +1662,19 @@ function desiredAccountWebSocketsV2ModeMap(pool: CodexPoolConfig): Record<string
return modes;
}
function desiredAccountTempUnschedulableMap(pool: CodexPoolConfig): Record<string, unknown> {
const codexDir = join(homedir(), ".codex");
const seenAccountNames = new Set<string>();
const configs = pool.profiles.length > 0 ? pool.profiles : discoverCodexProfileConfigs(codexDir, pool.defaultTempUnschedulable);
const policies: Record<string, unknown> = {};
for (const entry of configs) {
const accountName = entry.accountName ?? uniqueAccountName(entry.profile, seenAccountNames);
seenAccountNames.add(accountName);
policies[accountName] = renderSub2ApiTempUnschedulableCredentials(entry.tempUnschedulable);
}
return policies;
}
function remotePythonScript(mode: "sync" | "validate", encodedPayload: string, pool: CodexPoolConfig): string {
return `
set -u
@@ -1664,6 +1701,7 @@ MIN_OWNER_BALANCE_USD = ${JSON.stringify(pool.minOwnerBalanceUsd)}
POOL_DEFAULT_ACCOUNT_CAPACITY = ${JSON.stringify(pool.defaultAccountCapacity)}
EXPECTED_ACCOUNT_CAPACITIES = ${JSON.stringify(desiredAccountCapacityMap(pool))}
EXPECTED_ACCOUNT_WS_MODES = json.loads(${JSON.stringify(JSON.stringify(desiredAccountWebSocketsV2ModeMap(pool)))})
EXPECTED_ACCOUNT_TEMP_UNSCHEDULABLE = json.loads(${JSON.stringify(JSON.stringify(desiredAccountTempUnschedulableMap(pool)))})
MODE = "${mode}"
PAYLOAD_B64 = "${encodedPayload}"
@@ -1860,30 +1898,10 @@ def list_accounts(token):
return extract_items(data)
def temp_unschedulable_credentials(profile):
policy = profile.get("tempUnschedulable") or {}
enabled = policy.get("enabled") is True
rules = []
if enabled:
for rule in policy.get("rules") or []:
status_code = rule.get("statusCode") if isinstance(rule, dict) else None
duration_minutes = rule.get("durationMinutes") if isinstance(rule, dict) else None
keywords = rule.get("keywords") if isinstance(rule, dict) else None
if not isinstance(status_code, int) or not isinstance(duration_minutes, int) or not isinstance(keywords, list):
continue
clean_keywords = [str(item) for item in keywords if isinstance(item, str) and item.strip()]
if not clean_keywords:
continue
description = rule.get("description") if isinstance(rule.get("description"), str) else ""
rules.append({
"error_code": status_code,
"keywords": clean_keywords,
"duration_minutes": duration_minutes,
"description": description,
})
return {
"enabled": enabled and len(rules) > 0,
"rules": rules,
}
credentials = profile.get("tempUnschedulableCredentials")
if not isinstance(credentials, dict):
credentials = {}
return normalize_temp_unschedulable_credentials(credentials)
def account_payload(profile, group_id):
extra = {
@@ -2105,6 +2123,120 @@ def validate_gateway(api_key):
"valuesPrinted": False,
}
def bool_value(value):
if isinstance(value, bool):
return value
if isinstance(value, str):
if value.lower() == "true":
return True
if value.lower() == "false":
return False
return False
def normalize_temp_unschedulable_credentials(credentials):
if not isinstance(credentials, dict):
credentials = {}
enabled = bool_value(credentials.get("temp_unschedulable_enabled"))
raw_rules = credentials.get("temp_unschedulable_rules")
if isinstance(raw_rules, str):
try:
raw_rules = json.loads(raw_rules)
except json.JSONDecodeError:
raw_rules = []
rules = []
if isinstance(raw_rules, list):
for rule in raw_rules:
if not isinstance(rule, dict):
continue
error_code = rule.get("error_code", rule.get("statusCode"))
duration_minutes = rule.get("duration_minutes", rule.get("durationMinutes"))
keywords = rule.get("keywords")
if not isinstance(error_code, int) or not isinstance(duration_minutes, int) or not isinstance(keywords, list):
continue
clean_keywords = [item for item in keywords if isinstance(item, str) and item.strip()]
if not clean_keywords:
continue
description = rule.get("description") if isinstance(rule.get("description"), str) else ""
rules.append({
"error_code": error_code,
"keywords": clean_keywords,
"duration_minutes": duration_minutes,
"description": description,
})
return {
"enabled": enabled and len(rules) > 0,
"rules": rules if enabled else [],
}
def summarize_temp_unschedulable_rules(rules):
return [{
"errorCode": rule.get("error_code"),
"durationMinutes": rule.get("duration_minutes"),
"keywordCount": len(rule.get("keywords") or []),
"keywords": (rule.get("keywords") or [])[:8],
"hasDescription": bool(rule.get("description")),
} for rule in rules]
def get_account_detail(token, account):
account_id = account.get("id") if isinstance(account, dict) else None
if account_id is None:
return account
data = ensure_success(curl_api("GET", f"/api/v1/admin/accounts/{account_id}", bearer=token), f"get account {account.get('name')}")
return data if isinstance(data, dict) else account
def account_temp_unschedulable_status(token):
accounts = list_accounts(token)
by_name = {item.get("name"): item for item in accounts if isinstance(item.get("name"), str)}
items = []
missing = []
mismatched = []
enabled_names = []
for name in sorted(EXPECTED_ACCOUNT_TEMP_UNSCHEDULABLE):
expected = normalize_temp_unschedulable_credentials(EXPECTED_ACCOUNT_TEMP_UNSCHEDULABLE[name])
account = by_name.get(name)
if account is None:
missing.append(name)
items.append({
"accountName": name,
"accountId": None,
"expectedEnabled": expected["enabled"],
"runtimeEnabled": None,
"expectedRuleCount": len(expected["rules"]),
"runtimeRuleCount": None,
"ok": False,
})
continue
detail = get_account_detail(token, account)
credentials = detail.get("credentials") if isinstance(detail.get("credentials"), dict) else {}
runtime = normalize_temp_unschedulable_credentials(credentials)
ok = runtime == expected
if expected["enabled"]:
enabled_names.append(name)
if not ok:
mismatched.append(name)
items.append({
"accountName": name,
"accountId": account.get("id"),
"expectedEnabled": expected["enabled"],
"runtimeEnabled": runtime["enabled"],
"expectedRuleCount": len(expected["rules"]),
"runtimeRuleCount": len(runtime["rules"]),
"expectedRules": summarize_temp_unschedulable_rules(expected["rules"]),
"runtimeRules": summarize_temp_unschedulable_rules(runtime["rules"]),
"status": account.get("status"),
"schedulable": account.get("schedulable"),
"ok": ok,
})
return {
"ok": len(missing) == 0 and len(mismatched) == 0,
"desired": len(EXPECTED_ACCOUNT_TEMP_UNSCHEDULABLE),
"enabled": enabled_names,
"missing": missing,
"mismatched": mismatched,
"items": items,
"valuesPrinted": False,
}
def account_capacity_status(token):
accounts = list_accounts(token)
by_name = {item.get("name"): item for item in accounts if isinstance(item.get("name"), str)}
@@ -2233,12 +2365,13 @@ def run_sync():
account_results, pruned_account_results = ensure_accounts(token, profiles, group_id)
capacity_status = account_capacity_status(token)
ws_v2_status = account_ws_v2_status(token)
temp_unschedulable_status = account_temp_unschedulable_status(token)
api_key, secret_action, secret_apply_stdout = ensure_api_key_secret(group_id)
api_key_result = ensure_sub2api_api_key(token, api_key, group_id)
owner_balance = ensure_pool_owner_balance(token, api_key_result["userId"])
gateway = validate_gateway(api_key)
return {
"ok": gateway["ok"] is True and capacity_status["ok"] is True and ws_v2_status["ok"] is True,
"ok": gateway["ok"] is True and capacity_status["ok"] is True and ws_v2_status["ok"] is True and temp_unschedulable_status["ok"] is True,
"mode": "sync",
"namespace": NAMESPACE,
"serviceDns": SERVICE_DNS,
@@ -2256,6 +2389,7 @@ def run_sync():
},
"capacity": capacity_status,
"webSocketsV2": ws_v2_status,
"tempUnschedulable": temp_unschedulable_status,
"apiKey": {
"name": POOL_API_KEY_NAME,
"secret": f"{NAMESPACE}/{POOL_API_KEY_SECRET_NAME}.{POOL_API_KEY_SECRET_KEY}",
@@ -2283,9 +2417,10 @@ def run_validate():
owner_balance = ensure_pool_owner_balance(token, key_item["user_id"])
capacity_status = account_capacity_status(token)
ws_v2_status = account_ws_v2_status(token)
temp_unschedulable_status = account_temp_unschedulable_status(token)
gateway = validate_gateway(api_key)
return {
"ok": gateway["ok"] is True and capacity_status["ok"] is True and ws_v2_status["ok"] is True,
"ok": gateway["ok"] is True and capacity_status["ok"] is True and ws_v2_status["ok"] is True and temp_unschedulable_status["ok"] is True,
"mode": "validate",
"namespace": NAMESPACE,
"serviceDns": SERVICE_DNS,
@@ -2301,6 +2436,7 @@ def run_validate():
"ownerBalance": owner_balance,
"capacity": capacity_status,
"webSocketsV2": ws_v2_status,
"tempUnschedulable": temp_unschedulable_status,
"validation": {"gatewayModels": gateway},
}