Merge pull request #1536 from pikasTech/fix/1534-status-read-timeout

Fix branch follower status read timeout
This commit is contained in:
Lyon
2026-07-04 21:43:42 +08:00
committed by GitHub
+25 -11
View File
@@ -2021,12 +2021,13 @@ function mergeFollowerStatus(
function readK8sState(registry: BranchFollowerRegistry, options: ParsedOptions): K8sStateRead {
const errors: string[] = [];
const stateResult = kubeConfigMapFollowerState(registry, options);
const readTimeoutMs = statusReadTimeoutMs(registry, options);
const stateResult = kubeConfigMapFollowerState(registry, options, readTimeoutMs);
const namespace = registry.controller.namespace;
const deploymentResult = kubeJson(registry, options, `kubectl -n ${shQuote(namespace)} get deploy ${shQuote(registry.controller.deploymentName)} -o json`, 10_000, `/apis/apps/v1/namespaces/${encodeURIComponent(namespace)}/deployments/${encodeURIComponent(registry.controller.deploymentName)}`);
const leaseResult = kubeJson(registry, options, `kubectl -n ${shQuote(namespace)} get lease ${shQuote(registry.controller.leaseName)} -o json`, 10_000, `/apis/coordination.k8s.io/v1/namespaces/${encodeURIComponent(namespace)}/leases/${encodeURIComponent(registry.controller.leaseName)}`);
const deploymentResult = kubeJson(registry, options, `kubectl -n ${shQuote(namespace)} get deploy ${shQuote(registry.controller.deploymentName)} -o json`, readTimeoutMs, `/apis/apps/v1/namespaces/${encodeURIComponent(namespace)}/deployments/${encodeURIComponent(registry.controller.deploymentName)}`);
const leaseResult = kubeJson(registry, options, `kubectl -n ${shQuote(namespace)} get lease ${shQuote(registry.controller.leaseName)} -o json`, readTimeoutMs, `/apis/coordination.k8s.io/v1/namespaces/${encodeURIComponent(namespace)}/leases/${encodeURIComponent(registry.controller.leaseName)}`);
const podSelector = labelSelector(registry.controller.labels);
const podsResult = kubePodList(registry, options, podSelector);
const podsResult = kubePodList(registry, options, podSelector, readTimeoutMs);
if (!stateResult.ok) errors.push(`state configmap: ${stateResult.error}`);
if (!deploymentResult.ok && !isNotFoundText(deploymentResult.error)) errors.push(`deployment: ${deploymentResult.error}`);
if (!leaseResult.ok && !isNotFoundText(leaseResult.error)) errors.push(`lease: ${leaseResult.error}`);
@@ -2044,7 +2045,11 @@ function readK8sState(registry: BranchFollowerRegistry, options: ParsedOptions):
};
}
function kubeConfigMapFollowerState(registry: BranchFollowerRegistry, options: ParsedOptions): K8sFollowerStateRead {
function statusReadTimeoutMs(registry: BranchFollowerRegistry, options: ParsedOptions): number {
return Math.max(1, options.timeoutSeconds ?? registry.controller.budgets.statusSeconds) * 1000;
}
function kubeConfigMapFollowerState(registry: BranchFollowerRegistry, options: ParsedOptions, timeoutMs: number): K8sFollowerStateRead {
const followers = options.followerId === null ? registry.followers : registry.followers.filter((follower) => follower.id === options.followerId);
const maxTimingStages = options.followerId === null ? 8 : 16;
const script = [
@@ -2060,10 +2065,10 @@ function kubeConfigMapFollowerState(registry: BranchFollowerRegistry, options: P
"export NAMESPACE CONFIGMAP FOLLOWERS_JSON MAX_TIMING_STAGES",
"node \"$tmpdir/read-state-summary.mjs\"",
].join("\n");
const result = runKubeScript(registry, options, script, "", 10_000);
const result = runKubeScript(registry, options, script, "", timeoutMs);
const parsed = result.exitCode === 0 ? parseJsonObject(result.stdout) : null;
if (parsed === null) {
const error = redactText(tailText(result.stderr || result.stdout, 800));
const error = commandErrorSummary(result, timeoutMs);
return { ok: false, stateByFollower: {}, present: false, error };
}
const parsedStates = asOptionalRecord(parsed.stateByFollower) ?? {};
@@ -2158,11 +2163,11 @@ function kubeJson(registry: BranchFollowerRegistry, options: ParsedOptions, comm
return {
ok: result.exitCode === 0 && value !== null,
value,
error: redactText(tailText(result.stderr || result.stdout, 800)),
error: value === null ? commandErrorSummary(result, timeoutMs) : "",
};
}
function kubePodList(registry: BranchFollowerRegistry, options: ParsedOptions, selector: string): { ok: boolean; value: Record<string, unknown> | null; error: string } {
function kubePodList(registry: BranchFollowerRegistry, options: ParsedOptions, selector: string, timeoutMs: number): { ok: boolean; value: Record<string, unknown> | null; error: string } {
const command = `kubectl -n ${shQuote(registry.controller.namespace)} get pods -l ${shQuote(selector)} -o name`;
const script = options.inCluster
? [
@@ -2174,7 +2179,7 @@ function kubePodList(registry: BranchFollowerRegistry, options: ParsedOptions, s
`node "$tmpdir/kube-get.mjs" ${shQuote(`/api/v1/namespaces/${encodeURIComponent(registry.controller.namespace)}/pods?labelSelector=${encodeURIComponent(selector)}`)}`,
].join("\n")
: `set -eu\n${command}`;
const result = runKubeScript(registry, options, script, "", 10_000);
const result = runKubeScript(registry, options, script, "", timeoutMs);
const parsed = options.inCluster && result.exitCode === 0 ? parseJsonObject(result.stdout) : null;
const names = options.inCluster
? arrayRecords(parsed?.items).map((item) => stringOrNull(asOptionalRecord(item.metadata)?.name)).filter((name): name is string => name !== null)
@@ -2182,10 +2187,19 @@ function kubePodList(registry: BranchFollowerRegistry, options: ParsedOptions, s
return {
ok: result.exitCode === 0,
value: result.exitCode === 0 ? { items: names.map((name) => ({ metadata: { name } })) } : null,
error: redactText(tailText(result.stderr || result.stdout, 800)),
error: result.exitCode === 0 ? "" : commandErrorSummary(result, timeoutMs),
};
}
function commandErrorSummary(result: CommandResult, timeoutMs: number): string {
const tail = redactText(tailText(result.stderr || result.stdout, 800));
return [
result.timedOut ? `timedOut=true timeoutMs=${timeoutMs}` : null,
result.exitCode === null ? "exitCode=null" : result.exitCode === 0 ? null : `exitCode=${result.exitCode}`,
tail.length > 0 ? tail : null,
].filter((item): item is string => item !== null).join("; ") || "empty stdout/stderr";
}
function runKubeScript(registry: BranchFollowerRegistry, options: ParsedOptions, script: string, input: string, timeoutMs: number): CommandResult {
if (options.inCluster) {
return runCommand(["sh", "-lc", script], repoRoot, { input, timeoutMs });