fix: serialize concurrent tran session opens
This commit is contained in:
@@ -125,6 +125,8 @@ exec bun /root/unidesk/scripts/cli.ts ssh "$@"
|
||||
|
||||
`tran` 同样遵守 route/operation 解析器;route 后面的第一个 token 不是原生 ssh 命令字符串。不要写 `tran G14:/root/hwlab sh -lc '...'`,因为 `sh` 会被解析为 stdin script helper 的别名,`-lc` 会变成不受支持的 script 选项。带变量展开、管道、重定向或多条命令的远端逻辑,默认使用 `tran G14:/root/hwlab script <<'SCRIPT'`;默认 `script` 走目标节点 `/bin/sh`,并继承 provider-gateway/G14 已长期化的 proxy 环境。只有脚本确实使用 `pipefail`、数组、`[[ ... ]]` 等 bash 专有语义时才加 `--shell bash`,不能把 `--shell bash` 当作 proxy 修复手段。单进程命令才直接写成 argv,例如 `tran G14:/root/hwlab git status --short --branch`。遇到分布式开发摩擦时,优先补强 `tran` 的 route/operation、stdin helper 或目标节点环境,并把稳定解法写回长期参考文档,不要退回多层 shell 字符串拼接。
|
||||
|
||||
`tran` wrapper 会在打开 provider SSH session 前,对同一个 provider/plane 的非交互调用做本机文件锁串行化。该锁只覆盖 `tran <route> <operation> ...` 这类短命令,不覆盖 `tran <route>` 交互 shell,目的是避免 Codex 并发读文件或并发小命令同时冲击同一个 provider 的 session allocator,导致所有调用在 `provider session` 打开阶段超时。锁目录默认是 `/tmp/unidesk-tran-locks`,可用 `UNIDESK_TRAN_LOCK_DIR` 调整;等待超过 `UNIDESK_TRAN_LOCK_NOTICE_SECONDS` 会在 stderr 提示正在排队,超过 `UNIDESK_TRAN_LOCK_WARNING_SECONDS` 会提示高频分布式调用正在排队,超过 `UNIDESK_TRAN_LOCK_TIMEOUT_SECONDS` 会失败。只有排查锁本身或验证底层并发能力时才允许临时设置 `UNIDESK_TRAN_SESSION_LOCK=0`,普通分布式开发不得绕过该锁。
|
||||
|
||||
`bun scripts/cli.ts ssh D518` 应表现为登录 D518 WSL 的 shell;`bun scripts/cli.ts ssh D518 hostname` 应像 `ssh D518 hostname` 一样只输出远端命令结果并返回远端 exit code。Provider ID 前的目标选择由 UniDesk 节点清单决定,`-p`、`-i`、`-l`、`-o` 等传统 ssh 传输参数由 provider-gateway 部署配置统一管理,CLI 会兼容性消费这些参数但不会覆盖节点侧维护桥配置。指挥官、CI 预检和其他非交互流程不要依赖 ssh-like 自由拼接;单进程标准写法是 `bun scripts/cli.ts ssh D601 argv true`,多行 shell 逻辑标准写法是 quoted heredoc 单步调用 `bun scripts/cli.ts ssh D601 script <<'SCRIPT'`。
|
||||
|
||||
core 只允许声明了 `host.ssh` capability 的 provider 使用 `ssh` 透传或 `host.ssh` dispatch;旧 provider 不支持该能力时必须快速失败并输出错误,不能把未知命令误判成 `echo` 成功。
|
||||
|
||||
@@ -179,6 +179,7 @@ export function sshHelp(): unknown {
|
||||
"Do not use post-provider shorthand such as `ssh G14 k3s ...`; write `ssh G14:k3s ...` so location and operation stay separated.",
|
||||
"If an ssh-like remote command fails with timeout/kex/exit-255 friction, stderr includes one low-noise UNIDESK_SSH_HINT JSON line with the argv retry command.",
|
||||
"Every ssh/tran runtime writes one UNIDESK_SSH_TIMING JSON line to stderr with elapsedMs/elapsedSeconds; operations over 10s are marked level=warning and should be checked for provider latency, remote command cost, helper bootstrap, or tran/apply-patch optimization before repeating high-frequency work.",
|
||||
"The local tran wrapper serializes non-interactive calls per provider/plane before opening provider SSH sessions, so parallel Codex file reads do not stampede the provider session allocator; set UNIDESK_TRAN_SESSION_LOCK=0 only for explicit diagnostics.",
|
||||
"Use -- before a remote command that intentionally starts with a dash.",
|
||||
],
|
||||
};
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
import { spawnSync } from "node:child_process";
|
||||
import { mkdtempSync, readFileSync, rmSync, writeFileSync } from "node:fs";
|
||||
import { chmodSync, mkdirSync, mkdtempSync, readFileSync, rmSync, writeFileSync } from "node:fs";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
import { sshHelp } from "./src/help";
|
||||
@@ -53,6 +53,57 @@ function applyPatchFixture(args: string[], patch: string, files: Record<string,
|
||||
}
|
||||
}
|
||||
|
||||
function tranConcurrentLockFixture(): { status: number | null; stdout: string; stderr: string } {
|
||||
const root = mkdtempSync(path.join(os.tmpdir(), "unidesk-tran-lock-contract-"));
|
||||
try {
|
||||
const fakeRepo = path.join(root, "repo");
|
||||
const fakeScripts = path.join(fakeRepo, "scripts");
|
||||
const fakeBin = path.join(root, "bin");
|
||||
mkdirSync(fakeScripts, { recursive: true });
|
||||
mkdirSync(fakeBin, { recursive: true });
|
||||
writeFileSync(path.join(fakeScripts, "cli.ts"), "// fake cli entry for tran wrapper contract\n", "utf8");
|
||||
const fakeBun = path.join(fakeBin, "bun");
|
||||
writeFileSync(fakeBun, [
|
||||
"#!/bin/sh",
|
||||
"if mkdir \"$FAKE_BUN_RUN_LOCK\" 2>/dev/null; then",
|
||||
" sleep 1",
|
||||
" rmdir \"$FAKE_BUN_RUN_LOCK\"",
|
||||
" exit 0",
|
||||
"fi",
|
||||
"echo fake bun observed overlapping tran execution >&2",
|
||||
"exit 42",
|
||||
"",
|
||||
].join("\n"), "utf8");
|
||||
chmodSync(fakeBun, 0o755);
|
||||
const tranPath = path.resolve("scripts/tran");
|
||||
return spawnSync("sh", ["-c", [
|
||||
`"${tranPath}" D601:/tmp pwd >/tmp/unidesk-tran-lock-one.out 2>/tmp/unidesk-tran-lock-one.err &`,
|
||||
"p1=$!",
|
||||
`"${tranPath}" D601:/tmp pwd >/tmp/unidesk-tran-lock-two.out 2>/tmp/unidesk-tran-lock-two.err &`,
|
||||
"p2=$!",
|
||||
"wait $p1; s1=$?",
|
||||
"wait $p2; s2=$?",
|
||||
"cat /tmp/unidesk-tran-lock-one.err /tmp/unidesk-tran-lock-two.err >&2",
|
||||
"printf '%s %s\\n' \"$s1\" \"$s2\"",
|
||||
].join("\n")], {
|
||||
cwd: path.resolve("."),
|
||||
env: {
|
||||
...process.env,
|
||||
PATH: `${fakeBin}${path.delimiter}${process.env.PATH ?? ""}`,
|
||||
UNIDESK_TRAN_REPO_ROOT: fakeRepo,
|
||||
UNIDESK_TRAN_LOCK_DIR: path.join(root, "locks"),
|
||||
UNIDESK_TRAN_LOCK_NOTICE_SECONDS: "0",
|
||||
UNIDESK_TRAN_LOCK_TIMEOUT_SECONDS: "10",
|
||||
FAKE_BUN_RUN_LOCK: path.join(root, "fake-bun-running"),
|
||||
},
|
||||
encoding: "utf8",
|
||||
timeout: 10_000,
|
||||
});
|
||||
} finally {
|
||||
rmSync(root, { recursive: true, force: true });
|
||||
}
|
||||
}
|
||||
|
||||
export function runSshArgvGuidanceContract(): JsonRecord {
|
||||
const argv = parseSshArgs(["argv", "true"]);
|
||||
assertCondition(argv.invocationKind === "argv", "argv subcommand must be classified as argv", argv);
|
||||
@@ -321,6 +372,7 @@ export function runSshArgvGuidanceContract(): JsonRecord {
|
||||
assertCondition(helpText.includes("ssh D601:k3s:hwlab-dev:hwlab-cloud-api script <<'SCRIPT'"), "ssh help must document k3s script operation", helpText);
|
||||
assertCondition(helpText.includes("UNIDESK_SSH_HINT"), "ssh help must document structured failure hint", helpText);
|
||||
assertCondition(helpText.includes("UNIDESK_SSH_TIMING") && helpText.includes("10s"), "ssh help must document runtime timing hints", helpText);
|
||||
assertCondition(helpText.includes("UNIDESK_TRAN_SESSION_LOCK=0") && helpText.includes("provider session allocator"), "ssh help must document tran provider session serialization", helpText);
|
||||
|
||||
const crossChecks = providerTriageRecommendedCrossChecks("D601");
|
||||
assertCondition(crossChecks.includes("bun scripts/cli.ts ssh D601 argv true"), "provider triage cross-checks must keep argv true", crossChecks);
|
||||
@@ -346,6 +398,11 @@ export function runSshArgvGuidanceContract(): JsonRecord {
|
||||
const tranScript = readFileSync(new URL("./tran", import.meta.url), "utf8");
|
||||
assertCondition(tranScript.includes("CODE_QUEUE_DEV_CONTAINER_MASTER_HOST") && tranScript.includes("--main-server-ip"), "tran wrapper must auto-select frontend transport inside Code Queue runner pods", tranScript);
|
||||
assertCondition(tranScript.includes("UNIDESK_TRAN_LOCAL"), "tran wrapper must keep an explicit local override for diagnostics", tranScript);
|
||||
assertCondition(tranScript.includes("tran_lock_scope") && tranScript.includes("UNIDESK_TRAN_LOCK_DIR"), "tran wrapper must serialize concurrent provider session opens with a local sh lock", tranScript);
|
||||
const tranLock = tranConcurrentLockFixture();
|
||||
assertCondition(tranLock.status === 0, "tran lock fixture shell should complete", tranLock);
|
||||
assertCondition(tranLock.stdout.trim() === "0 0", "parallel tran invocations for one provider must serialize instead of overlapping fake bun", tranLock);
|
||||
assertCondition(!tranLock.stderr.includes("overlapping tran execution"), "tran provider lock must prevent overlapping provider session allocation", tranLock);
|
||||
|
||||
const remoteSource = readFileSync(new URL("./src/remote.ts", import.meta.url), "utf8");
|
||||
assertCondition(remoteSource.includes("UNIDESK_REMOTE_HTTP_CLIENT") && remoteSource.includes("isCodeQueueRunnerEnv(env) ? \"curl\" : \"fetch\""), "remote frontend transport must default to curl HTTP in Code Queue runner environments", remoteSource);
|
||||
@@ -389,6 +446,7 @@ export function runSshArgvGuidanceContract(): JsonRecord {
|
||||
"host apply-patch bootstraps only the apply_patch helper and uses a Perl fast path for large files",
|
||||
"remote frontend ssh uses authenticated /ws/ssh streaming instead of host.ssh dispatch task polling",
|
||||
"Code Queue runner image installs the tran wrapper and runner tran auto-selects remote frontend transport",
|
||||
"tran serializes concurrent non-interactive calls per provider/plane before opening provider SSH sessions",
|
||||
"Code Queue runner remote frontend HTTP uses curl by default for non-ssh API calls to avoid Bun response-body native crashes",
|
||||
],
|
||||
};
|
||||
|
||||
+58
-3
@@ -13,8 +13,63 @@ if [ -n "${CODE_QUEUE_SERVICE_ROLE:-}" ] || [ -n "${CODE_QUEUE_INSTANCE_ID:-}" ]
|
||||
runner_env=1
|
||||
fi
|
||||
|
||||
if [ "$runner_env" = 1 ] && [ -n "$host" ] && [ "${UNIDESK_TRAN_LOCAL:-}" != "1" ]; then
|
||||
exec bun "$repo/scripts/cli.ts" --main-server-ip "$host" ssh "$@"
|
||||
tran_lock_scope() {
|
||||
[ "$#" -ge 2 ] || return 1
|
||||
case "${UNIDESK_TRAN_SESSION_LOCK:-1}" in
|
||||
0|false|FALSE|no|NO|off|OFF) return 1 ;;
|
||||
esac
|
||||
route=$1
|
||||
case "$route" in
|
||||
""|-*) return 1 ;;
|
||||
esac
|
||||
provider=${route%%:*}
|
||||
[ -n "$provider" ] || return 1
|
||||
plane=host
|
||||
case "$route" in
|
||||
*:k3s*) plane=k3s ;;
|
||||
esac
|
||||
printf '%s\n' "$provider-$plane"
|
||||
}
|
||||
|
||||
tran_acquire_lock() {
|
||||
scope=$1
|
||||
lock_root=${UNIDESK_TRAN_LOCK_DIR:-/tmp/unidesk-tran-locks}
|
||||
lock_name=$(printf '%s' "$scope" | tr -c 'A-Za-z0-9_.-' '_')
|
||||
lock_path=$lock_root/$lock_name.lock
|
||||
notice_seconds=${UNIDESK_TRAN_LOCK_NOTICE_SECONDS:-3}
|
||||
warning_seconds=${UNIDESK_TRAN_LOCK_WARNING_SECONDS:-10}
|
||||
timeout_seconds=${UNIDESK_TRAN_LOCK_TIMEOUT_SECONDS:-120}
|
||||
mkdir -p "$lock_root"
|
||||
started=$(date +%s)
|
||||
noticed=0
|
||||
warned=0
|
||||
while ! mkdir "$lock_path" 2>/dev/null; do
|
||||
now=$(date +%s)
|
||||
waited=$((now - started))
|
||||
if [ "$noticed" = 0 ] && [ "$waited" -ge "$notice_seconds" ]; then
|
||||
printf 'tran provider session lock waiting scope=%s waited=%ss; serializing concurrent opens to avoid provider session allocation timeouts\n' "$scope" "$waited" >&2
|
||||
noticed=1
|
||||
fi
|
||||
if [ "$warned" = 0 ] && [ "$waited" -ge "$warning_seconds" ]; then
|
||||
printf 'tran provider session lock warning scope=%s waited=%ss; high-frequency distributed calls are queued behind another tran, consider batching reads or checking stuck sessions if this repeats\n' "$scope" "$waited" >&2
|
||||
warned=1
|
||||
fi
|
||||
if [ "$waited" -ge "$timeout_seconds" ]; then
|
||||
printf 'tran provider session lock timeout scope=%s waited=%ss lock=%s\n' "$scope" "$waited" "$lock_path" >&2
|
||||
exit 255
|
||||
fi
|
||||
sleep 1
|
||||
done
|
||||
trap 'rmdir "$lock_path" 2>/dev/null || true' EXIT
|
||||
}
|
||||
|
||||
if scope=$(tran_lock_scope "$@"); then
|
||||
tran_acquire_lock "$scope"
|
||||
fi
|
||||
|
||||
exec bun "$repo/scripts/cli.ts" ssh "$@"
|
||||
if [ "$runner_env" = 1 ] && [ -n "$host" ] && [ "${UNIDESK_TRAN_LOCAL:-}" != "1" ]; then
|
||||
bun "$repo/scripts/cli.ts" --main-server-ip "$host" ssh "$@"
|
||||
exit $?
|
||||
fi
|
||||
|
||||
bun "$repo/scripts/cli.ts" ssh "$@"
|
||||
|
||||
Reference in New Issue
Block a user