fix: harden sub2api codex failover stability
This commit is contained in:
@@ -161,7 +161,7 @@ sentinel:
|
||||
monitor:
|
||||
enabled: true
|
||||
actions:
|
||||
enabled: false
|
||||
enabled: true
|
||||
schedule: "*/1 * * * *"
|
||||
image: python:3.12-alpine
|
||||
sdk:
|
||||
|
||||
@@ -2008,6 +2008,8 @@ function compactGatewayResponsesRecent(block: unknown): unknown {
|
||||
slowFinalErrorCount: block.slowFinalErrorCount,
|
||||
contextCanceledCount: block.contextCanceledCount,
|
||||
ignoredProbeNoiseCount: block.ignoredProbeNoiseCount,
|
||||
failoverBudgetExhaustedCount: Array.isArray(block.failoverBudgetExhausted) ? block.failoverBudgetExhausted.length : undefined,
|
||||
failoverBudgetExhausted: Array.isArray(block.failoverBudgetExhausted) ? block.failoverBudgetExhausted.slice(-3).reverse() : undefined,
|
||||
recentFailovers: Array.isArray(block.recentFailovers) ? block.recentFailovers.slice(-4).reverse() : undefined,
|
||||
recentForwardFailures: Array.isArray(block.recentForwardFailures) ? block.recentForwardFailures.slice(-4).reverse().map((item) => ({
|
||||
...item,
|
||||
@@ -5134,6 +5136,51 @@ def ignored_probe_noise(items, section):
|
||||
result.append(probe)
|
||||
return result
|
||||
|
||||
def group_by_request_id(items):
|
||||
grouped = {}
|
||||
for item in items:
|
||||
request_id = item.get("requestId")
|
||||
if not isinstance(request_id, str) or not request_id:
|
||||
continue
|
||||
grouped.setdefault(request_id, []).append(item)
|
||||
return grouped
|
||||
|
||||
def failover_budget_exhausted_evidence(failovers, final_errors):
|
||||
final_by_request = {}
|
||||
for item in final_errors:
|
||||
request_id = item.get("requestId")
|
||||
if isinstance(request_id, str) and request_id:
|
||||
final_by_request[request_id] = item
|
||||
exhausted = []
|
||||
for request_id, request_failovers in group_by_request_id(failovers).items():
|
||||
final = final_by_request.get(request_id)
|
||||
if not final:
|
||||
continue
|
||||
last = request_failovers[-1]
|
||||
switch_count = last.get("switchCount")
|
||||
max_switches = last.get("maxSwitches")
|
||||
final_status = final.get("statusCode")
|
||||
if (
|
||||
isinstance(switch_count, int)
|
||||
and isinstance(max_switches, int)
|
||||
and max_switches > 0
|
||||
and switch_count >= max_switches
|
||||
and isinstance(final_status, int)
|
||||
and final_status >= 500
|
||||
):
|
||||
exhausted.append({
|
||||
"requestId": request_id,
|
||||
"clientRequestId": final.get("clientRequestId") or last.get("clientRequestId"),
|
||||
"path": final.get("path") or last.get("path"),
|
||||
"finalAccountId": final.get("accountId"),
|
||||
"finalStatusCode": final_status,
|
||||
"switchCount": switch_count,
|
||||
"maxSwitches": max_switches,
|
||||
"lastFailoverAccountId": last.get("accountId"),
|
||||
"lastUpstreamStatus": last.get("upstreamStatus"),
|
||||
})
|
||||
return exhausted
|
||||
|
||||
def recent_responses_gateway_evidence():
|
||||
proc = kubectl(["-n", NAMESPACE, "logs", "deployment/sub2api", "--since=6h", "--tail=2500"])
|
||||
stdout = proc.stdout.decode("utf-8", errors="replace")
|
||||
@@ -5189,6 +5236,7 @@ def recent_responses_gateway_evidence():
|
||||
visible_final_errors = filter_ignored_probe_noise(final_errors)
|
||||
visible_slow_final_errors = filter_ignored_probe_noise(slow_final_errors)
|
||||
visible_context_canceled = filter_ignored_probe_noise(context_canceled)
|
||||
failover_budget_exhausted = failover_budget_exhausted_evidence(visible_failovers, visible_final_errors)
|
||||
probe_noise = (
|
||||
ignored_probe_noise(failovers, "failovers")
|
||||
+ ignored_probe_noise(forward_failures, "forwardFailures")
|
||||
@@ -5207,6 +5255,7 @@ def recent_responses_gateway_evidence():
|
||||
"slowFinalErrorCount": len(visible_slow_final_errors),
|
||||
"contextCanceledCount": len(visible_context_canceled),
|
||||
"ignoredProbeNoiseCount": len(probe_noise),
|
||||
"failoverBudgetExhausted": failover_budget_exhausted[-8:],
|
||||
"rawCounts": {
|
||||
"failoverCount": len(failovers),
|
||||
"forwardFailureCount": len(forward_failures),
|
||||
@@ -5990,6 +6039,8 @@ def trace_reason(events, final_event):
|
||||
failovers = [item for item in events if item.get("type") == "failover"]
|
||||
select_failures = [item for item in events if item.get("type") == "select-failed"]
|
||||
upstream_errors = [item for item in events if item.get("type") == "upstream-error"]
|
||||
if failover_budget_exhausted(failovers, final_event):
|
||||
return "failover-budget-exhausted"
|
||||
if failovers and select_failures:
|
||||
return "failover-attempted-no-candidate"
|
||||
if failovers:
|
||||
@@ -6004,6 +6055,47 @@ def trace_reason(events, final_event):
|
||||
return "completed"
|
||||
return "unknown"
|
||||
|
||||
def failover_budget_exhausted(failovers, final_event):
|
||||
if not failovers or not isinstance(final_event, dict):
|
||||
return False
|
||||
last = failovers[-1]
|
||||
switch_count = last.get("switchCount")
|
||||
max_switches = last.get("maxSwitches")
|
||||
final_status = final_event.get("statusCode")
|
||||
return (
|
||||
isinstance(switch_count, int)
|
||||
and isinstance(max_switches, int)
|
||||
and max_switches > 0
|
||||
and switch_count >= max_switches
|
||||
and isinstance(final_status, int)
|
||||
and final_status >= 500
|
||||
)
|
||||
|
||||
def trace_untried_schedulable_accounts(failovers, final_event, account_snapshot):
|
||||
if not failover_budget_exhausted(failovers, final_event):
|
||||
return []
|
||||
tried = set()
|
||||
for item in failovers:
|
||||
account_id = item.get("accountId")
|
||||
if isinstance(account_id, int):
|
||||
tried.add(account_id)
|
||||
final_account = final_event.get("accountId") if isinstance(final_event, dict) else None
|
||||
if isinstance(final_account, int):
|
||||
tried.add(final_account)
|
||||
result = []
|
||||
for item in account_snapshot:
|
||||
account_id = item.get("accountId")
|
||||
if not isinstance(account_id, int) or account_id in tried:
|
||||
continue
|
||||
if item.get("schedulable") is True and item.get("status") == "active" and item.get("tempUnschedulableSet") is not True:
|
||||
result.append({
|
||||
"accountId": account_id,
|
||||
"accountName": item.get("accountName"),
|
||||
"priority": item.get("priority"),
|
||||
"concurrency": item.get("concurrency"),
|
||||
})
|
||||
return result
|
||||
|
||||
def run_trace():
|
||||
payload = json.loads(base64.b64decode(PAYLOAD_B64).decode("utf-8")) if PAYLOAD_B64 else {}
|
||||
request_id = payload.get("requestId")
|
||||
@@ -6063,6 +6155,7 @@ def run_trace():
|
||||
final_errors = [item for item in window_events if item.get("type") == "final" and isinstance(item.get("statusCode"), int) and item.get("statusCode") >= 400]
|
||||
window_failovers = [item for item in window_events if item.get("type") == "failover"]
|
||||
window_select_failures = [item for item in window_events if item.get("type") == "select-failed"]
|
||||
untried_schedulable_accounts = trace_untried_schedulable_accounts(failovers, final_event or {}, account_snapshot)
|
||||
reason = trace_reason(events, final_event)
|
||||
if not matched:
|
||||
outcome = "not-found"
|
||||
@@ -6112,6 +6205,10 @@ def run_trace():
|
||||
"tempUnschedulableCount": len(temp_unsched),
|
||||
"adminSchedulableCount": len(admin_sched),
|
||||
},
|
||||
"diagnostics": {
|
||||
"failoverBudgetExhausted": failover_budget_exhausted(failovers, final_event or {}),
|
||||
"untriedSchedulableAccounts": untried_schedulable_accounts,
|
||||
},
|
||||
"accountSnapshot": account_snapshot,
|
||||
"accountSnapshotError": account_snapshot_error,
|
||||
"rawLines": [{"line": item.get("_line")} for item in matched[-30:]] if show_lines else [],
|
||||
|
||||
Reference in New Issue
Block a user