Merge pull request #172 from pikasTech/fix/provider-gateway-promoted-heartbeat

fix: enforce final provider upgrade heartbeat
This commit is contained in:
Lyon
2026-05-24 13:43:02 +08:00
committed by GitHub
6 changed files with 766 additions and 22 deletions
+6 -4
View File
@@ -10,7 +10,7 @@ Provider Gateway 是计算节点侧容器。它只主动连出到主 server 暴
计算节点 `provider-gateway` 容器的重建和升级权威路径是 `provider.upgrade``mode: "schedule"`,或 frontend 中等价的显式升级调度。该路径由在线 provider 通过本地 Docker socket 启动 detached updater 容器,让升级动作脱离当前 WebSocket 与 SSH 透传会话的生命周期;重建目标只能是 `provider-gateway` service,并且必须带 `--no-deps``--force-recreate`,不得牵连 database、backend-core、frontend 或业务用户服务,也不得因为镜像 tag 未变而 no-op。
远程升级必须采用 sleep-and-validate 回滚保护:旧 gateway 在成功调度 updater 后关闭当前 WebSocket 并进入最长 5 分钟的助眠期;updater 先构建新镜像,再用旧容器的环境变量、挂载、网络和 `extra_hosts` 拉起候选 gateway;候选 gateway 必须在日志中出现 `connect_open` 和 register ack 成功,才允许删除旧 service 容器和候选容器,并用原 Compose service 重新创建最终 provider-gateway 容器。最终容器必须重新验证 `restart=always``pid=host`,避免候选临时容器命名或端口映射残留成第二条运行路径。候选验证失败时 updater 必须删除候选容器并退出失败,旧 gateway 到达助眠上限后自动重连主 server,形成自动回滚。backend-core 必须在同一 Provider ID 被新 WebSocket 替换后忽略旧 WebSocket 的 close 事件,避免候选已上线后又被旧连接关闭标记为 offline。
远程升级必须采用 sleep-and-validate 回滚保护:旧 gateway 在成功调度 updater 后关闭当前 WebSocket 并进入最长 5 分钟的助眠期;updater 先构建新镜像,再用旧容器的环境变量、挂载、网络和 `extra_hosts` 拉起候选 gateway;候选 gateway 必须在日志中出现 `connect_open` 和 register ack 成功,才允许删除旧 service 容器和候选容器,并用原 Compose service 重新创建最终 provider-gateway 容器。候选验证成功不是升级终态;最终 Compose 容器必须本地重新验证 `restart=always``pid=host` 和 backend-core reconnect,并且 backend-core 还必须看到同一 Provider ID、目标 `providerGatewayVersion`、非 candidate 容器名、足够新的真实 heartbeat(`providerGatewayHeartbeatAt`)后,`provider.upgrade` 任务才能进入 `succeeded`。最终容器未在窗口内重连时,updater 应优先恢复 last-known-good 容器,backend-core 必须把任务写成结构化 failure/blocker,不能把 candidate 曾经成功当作最终成功。backend-core 必须在同一 Provider ID 被新 WebSocket 替换后忽略旧 WebSocket 的 close 事件,避免候选已上线后又被旧连接关闭标记为 offline。
禁止通过 UniDesk 自己的 Host SSH / WSL SSH 透传同步执行 `docker compose up -d --build provider-gateway``docker compose restart provider-gateway``docker rm -f <provider-gateway>` 后再启动等自重建命令。原因是这条 SSH 透传连接正由被重建的旧 `provider-gateway` 容器承载;旧容器停止后会切断控制通道,可能把节点留在旧容器已停、新容器未起的不可达状态。SSH 透传只允许用于诊断、修复升级前置条件、查看本地状态和升级后验证,不允许作为计算节点 `provider-gateway` 正式重建/升级通道。
@@ -128,11 +128,13 @@ provider-gateway 连接成功后必须周期性上报节点 CPU、内存、硬
backend-core 可以通过真实 WebSocket 调度向在线 provider 下发 `provider.upgrade``mode: "plan"` 只返回升级计划,用于 E2E 和人工预检;`mode: "schedule"` 会要求 provider-gateway 通过本地 Docker socket 启动一个 detached updater 容器。updater 的固定策略是先执行 `docker compose build provider-gateway`,构建成功后只按 `com.docker.compose.project``com.docker.compose.service=provider-gateway` label 删除旧 provider-gateway 容器,最后执行 `docker compose up -d --no-deps --force-recreate provider-gateway``--no-deps` 是强制要求,`--force-recreate` 用于保证 provider 重新注册能力标签并避免 compose no-op;升级 provider-gateway 时不得重建或停止 database、backend-core、frontend。对 D518、D601 这类计算节点,`mode: "schedule"` 是正式重建/升级 `provider-gateway` 容器的唯一标准路径;升级执行路径使用 Docker socket 和只读仓库挂载,不使用 Host SSH 维护桥作为自动调度通道。
使用 compact build context 的节点必须把 context 视为派生缓存,不得让它成为版本真相。D601 这类节点可能让 Compose 从 `.state/provider-<ID>-build-context` 构建兜底镜像,以便复用本地基础镜像和缩小构建输入;`provider.upgrade mode=schedule``docker compose build` 前必须从只读 `/workspace/src/components/provider-gateway``/workspace/src/components/shared` 刷新该 context,并把宿主 `.state` 以可写方式只挂给 detached updater。验收不能只看 updater 日志中的 `Built``candidate provider-gateway validated and promoted`,必须以主 server 可观测`providerGatewayVersion``unideskCapabilities` 和目标能力字段为准;如果线上仍上报旧版本,说明运行镜像没有真正更新,必须先修复构建 context 再重跑标准升级。
使用 compact build context 的节点必须把 context 视为派生缓存,不得让它成为版本真相。D601 这类节点可能让 Compose 从 `.state/provider-<ID>-build-context` 构建兜底镜像,以便复用本地基础镜像和缩小构建输入;`provider.upgrade mode=schedule``docker compose build` 前必须从只读 `/workspace/src/components/provider-gateway``/workspace/src/components/shared` 刷新该 context,并把宿主 `.state` 以可写方式只挂给 detached updater。验收不能只看 updater 日志中的 `Built`candidate reconnect 或本地 promote 文本,必须以 backend-core 任务结果中最终非 candidate Compose 容器`providerGatewayVersion``unideskCapabilities``containerId``restartPolicy``pidMode``heartbeatTimestamp` 为准;如果线上仍上报旧版本,说明运行镜像没有真正更新,必须先修复构建 context 再重跑标准升级。
远程升级策略固定为 always-enabled:只要 provider-gateway 在线并声明 `provider.upgrade``mode: "schedule"` 就必须真正调度升级容器,不允许被 `PROVIDER_UPGRADE_ENABLED=false`、前端隐藏按钮或服务端特殊名单禁用。升级能力的安全边界不是开关,而是显式 `PROVIDER_UPGRADE_*` 配置、Docker socket 权限、只读仓库挂载、固定 Compose service 和 `--no-deps` 约束。升级计划中必须展示 `policy: "always-enabled"`、updater 容器名、runner image、workspace、Compose project/service、env file、compose file 和实际 `docker run` 命令,方便前端任务历史与 CLI debug 直接诊断。
`mode: "schedule"` 的成功返回只代表 updater 已被调度,最终升级成败由候选 gateway 自验证决定。updater 必须先按 Compose 构建新镜像,再用旧容器的 `Config.Env` 生成候选 env-file,并复用旧容器的 Docker socket、日志目录、SSH 私钥只读挂载、Compose 网络和 `extra_hosts`;候选容器启动时 restart policy 必须先是 `no`,并显式使用 `--pid host` 保持节点级进程资源采集,验证通过后才能改成 `always` 并删除旧容器。升级计划的 `replacementStrategy` 必须包含 `oldGatewaySleepMs``validationTimeoutMs``promoteOnlyAfterCandidateValidation``candidateRestartPolicyAfterPromotion: "always"``candidateUsesOldContainerEnvironment``candidateUsesOldContainerMounts``candidateUsesOldContainerNetworks``candidateUsesOldContainerExtraHosts``candidateUsesHostPidNamespace`,并且必须在 plan 中显示指定 Provider 的当前/目标 gateway 版本号,便于前端和 CLI 判断这不是旧的先删旧容器再 up 的危险流程。
`mode: "schedule"` 调度成功后,provider-gateway 只能把任务推进到 `running`,不能返回终态成功;backend-core 必须接管最终终态判定。updater 必须先按 Compose 构建新镜像,再用旧容器的 `Config.Env` 生成候选 env-file,并复用旧容器的 Docker socket、日志目录、SSH 私钥只读挂载、Compose 网络和 `extra_hosts`;候选容器启动时 restart policy 必须先是 `no`,并显式使用 `--pid host` 保持节点级进程资源采集。candidate 通过日志级 register ack 后仍只是 promotion 前置条件;最终成功必须等 `docker compose up -d --no-deps --force-recreate provider-gateway` 创建出的非 candidate Compose 容器向 backend-core 上报目标版本 heartbeat。升级计划的 `replacementStrategy` 必须包含 `oldGatewaySleepMs``validationTimeoutMs``finalPromotedReconnectTimeoutMs``finalPromotedHeartbeatTimeoutMs``promoteOnlyAfterCandidateValidation``candidateValidationIsNotTerminalSuccess``finalPromotedComposeHeartbeatRequired``candidateUsesOldContainerEnvironment``candidateUsesOldContainerMounts``candidateUsesOldContainerNetworks``candidateUsesOldContainerExtraHosts``candidateUsesHostPidNamespace`,并且必须在 plan 中显示指定 Provider 的当前/目标 gateway 版本号,便于前端和 CLI 判断这不是旧的先删旧容器再 up 的危险流程。
`provider.upgrade` 的终态 result 必须包含最终 Compose 容器的 `containerId``version``restartPolicy``pidMode``heartbeatTimestamp`;字段不可得时必须给出稳定的 `*UnavailableReason`,不能省略或用空字符串伪装成功。若 backend-core 在窗口内只观察到 candidate 容器 heartbeat、旧版本 heartbeat、离线状态或缺失 heartbeat,任务必须 `failed``reason` 使用结构化 blocker 语义,并包含 last-known-good/rollback 语义,提示以旧容器恢复或回滚为优先处置。
远程更新记录的权威来源是 backend-core 保存的 `provider.upgrade` 任务历史,而不是 provider-gateway 容器日志文件。frontend 必须按 Provider 聚合这些任务,并把状态、模式、task id、来源、耗时、策略、updater 容器摘要、失败原因和更新时间渲染为表格或卡片;完整 task/result JSON 只能由操作员点击 `查看原始JSON` 后查看。
@@ -152,7 +154,7 @@ D601 这类长期 WSL provider 不得因为单一路径失败被直接写成全
如果节点已有专用 Compose,优先用节点本地 Compose 手动重建一次:`docker compose --env-file .state/provider-<ID>.env -f <compose-file> -p <compose-project> up -d --no-deps --build --force-recreate provider-gateway`。这条命令必须在节点本地终端、节点自有 Web terminal、系统计划任务或 detached shell 中执行;不得通过正在被重建的 UniDesk provider-gateway 自己提供的 SSH 透传同步执行,否则旧 provider 容器停止时会切断 SSH client,可能导致重建中断在旧容器已停、新容器未起的状态。若只能通过 UniDesk 触达该节点,必须使用 `provider.upgrade mode=schedule` 的 detached updater,或先用节点本地 `nohup`/systemd 启动一个不依赖当前 provider 容器生命周期的重建脚本。老版 `docker-compose` 可能在重建已存在容器时因为 `ContainerConfig` 兼容问题失败;此时只能移除目标 provider-gateway 容器后重新 `up -d --no-deps provider-gateway`,不得执行 `down -v``docker volume rm` 或任何会影响 database 命名卷的命令。如果节点当前只有 `docker run` 部署,则先构建镜像 `docker build -f src/components/provider-gateway/Dockerfile -t unidesk_provider-gateway:<id> .`,再以固定容器名重建:使用 `--restart always --pid host`,挂载 `/var/run/docker.sock:/var/run/docker.sock``/home/ubuntu/unidesk:/workspace:ro`、节点日志目录到 `/var/log/unidesk`,如需 WSL SSH 维护桥还要把只读私钥目录挂载到 `/run/host-ssh`,并使用同一个 `.state/provider-<ID>.env` 启动。无论 Compose 还是 `docker run`,容器名和镜像 tag 都必须带 Provider ID,便于 Docker 状态页、进程资源表、任务历史和节点本地排障互相对应。
手动升级完成后的判定标准固定为主 server 可观测结果,而不是节点容器 `running`:访问公网 frontend `http://74.48.78.17:18081/`,确认该 Provider 在线;随后在任意装有本仓库且 `config.json` 含正确 frontend 登录凭据的计算节点上执行 `bun scripts/cli.ts --main-server-ip 74.48.78.17 debug dispatch <PROVIDER_ID> provider.upgrade --mode schedule --wait-ms 15000`,确认任务 `succeeded` 且 result 包含 updater 容器信息;最后再次查看 frontend 或执行 `bun scripts/cli.ts --main-server-ip 74.48.78.17 debug health`,确认节点重连、指标恢复、labels 中 `host.ssh` 能力存在。每个 provider-gateway 手动升级后都必须用 remote CLI 再执行 `bun scripts/cli.ts --main-server-ip 74.48.78.17 debug dispatch <PROVIDER_ID> host.ssh --wait-ms 15000``bun scripts/cli.ts --main-server-ip 74.48.78.17 ssh <PROVIDER_ID> hostname`,验证维护桥没有在升级后丢失;该 remote CLI 默认走公网 frontend,不需要指定 `--main-server-key`
手动升级完成后的判定标准固定为主 server 可观测结果,而不是节点容器 `running`:访问公网 frontend `http://74.48.78.17:18081/`,确认该 Provider 在线;随后在任意装有本仓库且 `config.json` 含正确 frontend 登录凭据的计算节点上执行 `bun scripts/cli.ts --main-server-ip 74.48.78.17 debug dispatch <PROVIDER_ID> provider.upgrade --mode schedule --wait-ms 300000`,确认任务 `succeeded` 且 result 包含最终 Compose 容器 `containerId`、目标 gateway `version``restartPolicy=always``pidMode=host` 和新的 `heartbeatTimestamp`;最后再次查看 frontend 或执行 `bun scripts/cli.ts --main-server-ip 74.48.78.17 debug health`,确认节点重连、指标恢复、labels 中 `host.ssh` 能力存在。每个 provider-gateway 手动升级后都必须用 remote CLI 再执行 `bun scripts/cli.ts --main-server-ip 74.48.78.17 debug dispatch <PROVIDER_ID> host.ssh --wait-ms 15000``bun scripts/cli.ts --main-server-ip 74.48.78.17 ssh <PROVIDER_ID> hostname`,验证维护桥没有在升级后丢失;该 remote CLI 默认走公网 frontend,不需要指定 `--main-server-key`
## Host SSH Maintenance Bridge
@@ -3,12 +3,13 @@ use std::sync::Arc;
use axum::extract::ws::{Message, WebSocket, WebSocketUpgrade};
use axum::http::{HeaderMap, Uri};
use axum::response::Response;
use chrono::{DateTime, Duration, Utc};
use futures_util::{SinkExt, StreamExt};
use serde_json::{json, Value};
use tokio::sync::mpsc;
use crate::db::{
record_event, update_provider_heartbeat, upsert_docker_status, upsert_provider_node,
raw_task, record_event, update_provider_heartbeat, upsert_docker_status, upsert_provider_node,
upsert_system_status,
};
use crate::egress_tcp::{handle_egress_tcp_close, handle_egress_tcp_data, handle_egress_tcp_open};
@@ -17,6 +18,513 @@ use crate::json_util::compact_json_for_storage;
use crate::ssh_bridge::forward_ssh_provider_message;
use crate::state::{AppState, HttpTunnelResponse, ProviderConnection};
use crate::task_dispatcher::{is_terminal_task_status, notify_task_terminal};
use crate::types::RawTaskRow;
const PROVIDER_UPGRADE_FINAL_HEARTBEAT_TIMEOUT_MS: i64 = 240_000;
const PROVIDER_UPGRADE_FINAL_HEARTBEAT_POLL_MS: u64 = 2_000;
#[derive(Clone, Debug)]
struct ProviderUpgradeFinalHeartbeatExpectation {
task_id: String,
provider_id: String,
target_version: String,
min_heartbeat_at: DateTime<Utc>,
deadline_at: DateTime<Utc>,
scheduler_result: Value,
candidate_name: Option<String>,
last_known_good_version: Option<String>,
}
#[derive(Clone, Debug)]
struct ProviderUpgradeNodeSnapshot {
provider_id: String,
status: String,
labels: Value,
}
#[derive(Clone, Debug)]
enum ProviderUpgradeFinalHeartbeatDecision {
Pending(Value),
Succeeded(Value),
Failed(Value),
}
fn json_string_at(value: &Value, path: &[&str]) -> Option<String> {
let mut current = value;
for part in path {
current = current.get(*part)?;
}
current
.as_str()
.filter(|text| !text.is_empty())
.map(ToOwned::to_owned)
}
fn provider_upgrade_label_string(labels: &Value, key: &str) -> Option<String> {
labels
.get(key)
.and_then(Value::as_str)
.filter(|value| !value.is_empty())
.map(ToOwned::to_owned)
}
fn provider_upgrade_heartbeat_at(snapshot: &ProviderUpgradeNodeSnapshot) -> Option<DateTime<Utc>> {
provider_upgrade_label_string(&snapshot.labels, "providerGatewayHeartbeatAt").and_then(
|value| {
DateTime::parse_from_rfc3339(&value)
.ok()
.map(|parsed| parsed.with_timezone(&Utc))
},
)
}
fn provider_upgrade_schedule_task(task: &RawTaskRow) -> bool {
task.command == "provider.upgrade"
&& task.payload.get("mode").and_then(Value::as_str) == Some("schedule")
}
fn provider_upgrade_target_version(task: &RawTaskRow, scheduler_result: &Value) -> Option<String> {
json_string_at(scheduler_result, &["targetProviderGatewayVersion"])
.or_else(|| json_string_at(scheduler_result, &["plan", "targetProviderGatewayVersion"]))
.or_else(|| json_string_at(&task.payload, &["targetProviderGatewayVersion"]))
}
fn provider_upgrade_last_known_good_version(scheduler_result: &Value) -> Option<String> {
json_string_at(scheduler_result, &["providerGatewayVersion"])
.or_else(|| json_string_at(scheduler_result, &["plan", "providerGatewayVersion"]))
}
fn provider_upgrade_candidate_name(scheduler_result: &Value) -> Option<String> {
json_string_at(scheduler_result, &["candidateName"])
.or_else(|| json_string_at(scheduler_result, &["plan", "candidateName"]))
}
fn provider_upgrade_expectation(
task: &RawTaskRow,
scheduler_result: &Value,
now: DateTime<Utc>,
) -> Option<ProviderUpgradeFinalHeartbeatExpectation> {
let target_version = provider_upgrade_target_version(task, scheduler_result)?;
Some(ProviderUpgradeFinalHeartbeatExpectation {
task_id: task.id.clone(),
provider_id: task.provider_id.clone(),
target_version,
min_heartbeat_at: now,
deadline_at: now + Duration::milliseconds(PROVIDER_UPGRADE_FINAL_HEARTBEAT_TIMEOUT_MS),
scheduler_result: scheduler_result.clone(),
candidate_name: provider_upgrade_candidate_name(scheduler_result),
last_known_good_version: provider_upgrade_last_known_good_version(scheduler_result),
})
}
fn provider_upgrade_snapshot_is_candidate(
snapshot: &ProviderUpgradeNodeSnapshot,
candidate_name: Option<&str>,
) -> bool {
let container_name =
provider_upgrade_label_string(&snapshot.labels, "providerGatewayContainerName");
if let (Some(container_name), Some(candidate_name)) =
(container_name.as_deref(), candidate_name)
{
if container_name == candidate_name {
return true;
}
}
container_name
.as_deref()
.is_some_and(|name| name.contains("-candidate-"))
}
fn insert_json_field_with_unavailable_reason(
object: &mut serde_json::Map<String, Value>,
key: &str,
value: Option<String>,
unavailable_reason: &str,
) {
if let Some(value) = value {
object.insert(key.to_string(), Value::String(value));
} else {
object.insert(key.to_string(), Value::Null);
object.insert(
format!("{key}UnavailableReason"),
Value::String(unavailable_reason.to_string()),
);
}
}
fn provider_upgrade_final_container_json(
snapshot: Option<&ProviderUpgradeNodeSnapshot>,
unavailable_reason: &str,
) -> Value {
let labels = snapshot.map(|snapshot| &snapshot.labels);
let heartbeat = snapshot
.and_then(provider_upgrade_heartbeat_at)
.map(|value| value.to_rfc3339_opts(chrono::SecondsFormat::Millis, true));
let mut object = serde_json::Map::new();
insert_json_field_with_unavailable_reason(
&mut object,
"containerId",
labels
.and_then(|labels| provider_upgrade_label_string(labels, "providerGatewayContainerId")),
unavailable_reason,
);
insert_json_field_with_unavailable_reason(
&mut object,
"containerName",
labels.and_then(|labels| {
provider_upgrade_label_string(labels, "providerGatewayContainerName")
}),
unavailable_reason,
);
insert_json_field_with_unavailable_reason(
&mut object,
"version",
labels.and_then(|labels| provider_upgrade_label_string(labels, "providerGatewayVersion")),
unavailable_reason,
);
insert_json_field_with_unavailable_reason(
&mut object,
"restartPolicy",
labels.and_then(|labels| {
provider_upgrade_label_string(labels, "providerGatewayRestartPolicy")
}),
unavailable_reason,
);
insert_json_field_with_unavailable_reason(
&mut object,
"pidMode",
labels.and_then(|labels| provider_upgrade_label_string(labels, "providerGatewayPidMode")),
unavailable_reason,
);
insert_json_field_with_unavailable_reason(
&mut object,
"heartbeatTimestamp",
heartbeat,
unavailable_reason,
);
Value::Object(object)
}
fn evaluate_provider_upgrade_final_heartbeat(
expectation: &ProviderUpgradeFinalHeartbeatExpectation,
snapshot: Option<&ProviderUpgradeNodeSnapshot>,
now: DateTime<Utc>,
) -> ProviderUpgradeFinalHeartbeatDecision {
let candidate_snapshot = snapshot.is_some_and(|snapshot| {
provider_upgrade_snapshot_is_candidate(snapshot, expectation.candidate_name.as_deref())
});
let unavailable_reason = if candidate_snapshot {
"latest heartbeat came from candidate validation container, not final promoted compose service"
} else {
"final promoted compose heartbeat not observed"
};
let final_container_snapshot = if candidate_snapshot { None } else { snapshot };
if let Some(snapshot) = snapshot {
let observed_version =
provider_upgrade_label_string(&snapshot.labels, "providerGatewayVersion");
let heartbeat_is_new = provider_upgrade_heartbeat_at(snapshot)
.is_some_and(|last| last >= expectation.min_heartbeat_at);
let same_provider = snapshot.provider_id.as_str() == expectation.provider_id.as_str();
let online = snapshot.status == "online";
if same_provider
&& online
&& observed_version.as_deref() == Some(expectation.target_version.as_str())
&& heartbeat_is_new
&& !candidate_snapshot
{
let restart_policy =
provider_upgrade_label_string(&snapshot.labels, "providerGatewayRestartPolicy");
let pid_mode =
provider_upgrade_label_string(&snapshot.labels, "providerGatewayPidMode");
if restart_policy.as_deref() != Some("always") || pid_mode.as_deref() != Some("host") {
return ProviderUpgradeFinalHeartbeatDecision::Failed(json!({
"ok": false,
"reason": "final-promoted-runtime-guard-failed",
"blocker": true,
"providerId": expectation.provider_id.as_str(),
"taskId": expectation.task_id.as_str(),
"targetProviderGatewayVersion": expectation.target_version.as_str(),
"finalPromotedContainer": provider_upgrade_final_container_json(Some(snapshot), "runtime guard heartbeat fields are incomplete"),
"rollback": {
"preferred": "last-known-good",
"required": true,
"lastKnownGoodProviderGatewayVersion": expectation.last_known_good_version.as_deref(),
"updaterRollbackExpected": true
},
"candidateValidationWasTerminal": false,
"schedulerResult": expectation.scheduler_result.clone()
}));
}
return ProviderUpgradeFinalHeartbeatDecision::Succeeded(json!({
"ok": true,
"reason": "final-promoted-heartbeat-confirmed",
"providerId": expectation.provider_id.as_str(),
"taskId": expectation.task_id.as_str(),
"targetProviderGatewayVersion": expectation.target_version.as_str(),
"finalPromotedContainer": provider_upgrade_final_container_json(Some(snapshot), "not reported by provider heartbeat"),
"heartbeat": {
"minHeartbeatAt": expectation.min_heartbeat_at.to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
"observedAt": provider_upgrade_heartbeat_at(snapshot).map(|value| value.to_rfc3339_opts(chrono::SecondsFormat::Millis, true))
},
"candidateValidationWasTerminal": false,
"schedulerResult": expectation.scheduler_result.clone()
}));
}
}
let pending_result = json!({
"ok": false,
"status": "running",
"reason": "awaiting-final-promoted-heartbeat",
"providerId": expectation.provider_id.as_str(),
"taskId": expectation.task_id.as_str(),
"targetProviderGatewayVersion": expectation.target_version.as_str(),
"deadlineAt": expectation.deadline_at.to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
"candidateValidationWasTerminal": false,
"finalPromotedContainer": provider_upgrade_final_container_json(final_container_snapshot, unavailable_reason),
"schedulerResult": expectation.scheduler_result.clone()
});
if now < expectation.deadline_at {
return ProviderUpgradeFinalHeartbeatDecision::Pending(pending_result);
}
ProviderUpgradeFinalHeartbeatDecision::Failed(json!({
"ok": false,
"reason": "final-promoted-heartbeat-timeout",
"error": "final promoted provider-gateway compose container did not report a target-version heartbeat before the guard window expired",
"blocker": true,
"providerId": expectation.provider_id.as_str(),
"taskId": expectation.task_id.as_str(),
"targetProviderGatewayVersion": expectation.target_version.as_str(),
"heartbeat": {
"minHeartbeatAt": expectation.min_heartbeat_at.to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
"deadlineAt": expectation.deadline_at.to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
"observedAt": snapshot.and_then(provider_upgrade_heartbeat_at).map(|value| value.to_rfc3339_opts(chrono::SecondsFormat::Millis, true))
},
"finalPromotedContainer": provider_upgrade_final_container_json(final_container_snapshot, unavailable_reason),
"rollback": {
"preferred": "last-known-good",
"required": true,
"lastKnownGoodProviderGatewayVersion": expectation.last_known_good_version.as_deref(),
"updaterRollbackExpected": true
},
"candidateValidationWasTerminal": false,
"schedulerResult": expectation.scheduler_result.clone()
}))
}
async fn provider_upgrade_node_snapshot(
state: &Arc<AppState>,
provider_id: &str,
) -> anyhow::Result<Option<ProviderUpgradeNodeSnapshot>> {
if !state.db_ready() {
return Ok(None);
}
let client = state.pool.get().await?;
let row = client
.query_opt(
"SELECT provider_id, status, labels FROM unidesk_nodes WHERE provider_id = $1 LIMIT 1",
&[&provider_id],
)
.await?;
Ok(row.map(|row| ProviderUpgradeNodeSnapshot {
provider_id: row.get("provider_id"),
status: row.get("status"),
labels: row.get("labels"),
}))
}
async fn complete_provider_upgrade_task(
state: &Arc<AppState>,
task_id: &str,
provider_id: &str,
status: &str,
result: Value,
) -> anyhow::Result<()> {
let client = state.pool.get().await?;
let updated = client
.execute(
r#"
UPDATE unidesk_tasks
SET status = $2, result = $3::jsonb, updated_at = now()
WHERE id = $1
AND command = 'provider.upgrade'
AND status NOT IN ('succeeded', 'failed')
"#,
&[&task_id, &status, &result],
)
.await?;
record_event(
state,
"provider_upgrade_final_heartbeat_result",
provider_id,
json!({
"providerId": provider_id,
"taskId": task_id,
"status": status,
"result": result,
"updated": updated,
}),
)
.await;
if updated > 0 && is_terminal_task_status(status) {
notify_task_terminal(state, task_id).await?;
}
Ok(())
}
async fn mark_provider_upgrade_awaiting_final_heartbeat(
state: &Arc<AppState>,
expectation: &ProviderUpgradeFinalHeartbeatExpectation,
pending_result: Value,
) -> anyhow::Result<()> {
let client = state.pool.get().await?;
client
.execute(
r#"
UPDATE unidesk_tasks
SET status = 'running', result = $2::jsonb, updated_at = now()
WHERE id = $1
AND command = 'provider.upgrade'
AND status NOT IN ('succeeded', 'failed')
"#,
&[&expectation.task_id, &pending_result],
)
.await?;
record_event(
state,
"provider_upgrade_final_heartbeat_wait",
&expectation.provider_id,
json!({
"providerId": expectation.provider_id.as_str(),
"taskId": expectation.task_id.as_str(),
"targetProviderGatewayVersion": expectation.target_version.as_str(),
"deadlineAt": expectation.deadline_at.to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
"candidateValidationWasTerminal": false,
}),
)
.await;
Ok(())
}
async fn monitor_provider_upgrade_final_heartbeat(
state: Arc<AppState>,
expectation: ProviderUpgradeFinalHeartbeatExpectation,
) -> anyhow::Result<()> {
loop {
let snapshot = provider_upgrade_node_snapshot(&state, &expectation.provider_id).await?;
match evaluate_provider_upgrade_final_heartbeat(&expectation, snapshot.as_ref(), Utc::now())
{
ProviderUpgradeFinalHeartbeatDecision::Succeeded(result) => {
complete_provider_upgrade_task(
&state,
&expectation.task_id,
&expectation.provider_id,
"succeeded",
result,
)
.await?;
break;
}
ProviderUpgradeFinalHeartbeatDecision::Failed(result) => {
complete_provider_upgrade_task(
&state,
&expectation.task_id,
&expectation.provider_id,
"failed",
result,
)
.await?;
break;
}
ProviderUpgradeFinalHeartbeatDecision::Pending(_) => {
tokio::time::sleep(std::time::Duration::from_millis(
PROVIDER_UPGRADE_FINAL_HEARTBEAT_POLL_MS,
))
.await;
}
}
}
Ok(())
}
async fn ensure_provider_upgrade_final_heartbeat_watcher(
state: &Arc<AppState>,
expectation: ProviderUpgradeFinalHeartbeatExpectation,
) {
let task_id = expectation.task_id.clone();
let inserted = {
let mut active = state.active_provider_upgrade_watchers.lock().await;
active.insert(task_id.clone())
};
if !inserted {
return;
}
let state_for_task = state.clone();
tokio::spawn(async move {
let provider_id = expectation.provider_id.clone();
if let Err(error) =
monitor_provider_upgrade_final_heartbeat(state_for_task.clone(), expectation).await
{
state_for_task.log(
"error",
"provider_upgrade_final_heartbeat_watcher_failed",
Some(json!({ "taskId": task_id.as_str(), "providerId": provider_id.as_str(), "error": error.to_string() })),
);
}
let mut active = state_for_task.active_provider_upgrade_watchers.lock().await;
active.remove(&task_id);
});
}
async fn maybe_handle_provider_upgrade_schedule_status(
state: &Arc<AppState>,
provider_id: &str,
task_id: &str,
status: &str,
stored_result: &Value,
) -> anyhow::Result<bool> {
let starts_final_guard = status == "succeeded"
|| (status == "running"
&& stored_result.get("mode").and_then(Value::as_str) == Some("schedule")
&& stored_result
.get("terminalStatusManagedByBackendCore")
.and_then(Value::as_bool)
== Some(true));
if !starts_final_guard {
return Ok(false);
}
let Some(task) = raw_task(state, task_id).await? else {
return Ok(false);
};
if !provider_upgrade_schedule_task(&task) {
return Ok(false);
}
let now = Utc::now();
let Some(expectation) = provider_upgrade_expectation(&task, stored_result, now) else {
let result = json!({
"ok": false,
"reason": "target-provider-gateway-version-unavailable",
"error": "provider.upgrade mode=schedule cannot be finalized because targetProviderGatewayVersion was not reported",
"blocker": true,
"providerId": provider_id,
"taskId": task_id,
"finalPromotedContainer": provider_upgrade_final_container_json(None, "target gateway version unavailable before final heartbeat guard"),
"candidateValidationWasTerminal": false,
"schedulerResult": stored_result
});
complete_provider_upgrade_task(state, task_id, provider_id, "failed", result).await?;
return Ok(true);
};
let pending = match evaluate_provider_upgrade_final_heartbeat(&expectation, None, now) {
ProviderUpgradeFinalHeartbeatDecision::Pending(result) => result,
ProviderUpgradeFinalHeartbeatDecision::Succeeded(result)
| ProviderUpgradeFinalHeartbeatDecision::Failed(result) => result,
};
mark_provider_upgrade_awaiting_final_heartbeat(state, &expectation, pending).await?;
ensure_provider_upgrade_final_heartbeat_watcher(state, expectation).await;
Ok(true)
}
pub async fn provider_ws(
state: Arc<AppState>,
@@ -202,7 +710,16 @@ async fn handle_provider_text(
let _ = connection.sender.send(Message::Text(json!({ "type": "ack", "requestId": "register", "ok": true, "message": "registered" }).to_string()));
}
"heartbeat" => {
let labels = message.get("labels").cloned().unwrap_or_else(|| json!({}));
let mut labels = message.get("labels").cloned().unwrap_or_else(|| json!({}));
if let Some(object) = labels.as_object_mut() {
object.insert(
"providerGatewayHeartbeatAt".to_string(),
message
.get("at")
.cloned()
.unwrap_or_else(|| Value::String(Utc::now().to_rfc3339())),
);
}
update_provider_heartbeat(state, &provider_id, &labels).await?;
state.log(
"debug",
@@ -241,6 +758,17 @@ async fn handle_provider_text(
compact_json_for_storage(&message.get("result").cloned().unwrap_or_else(
|| json!({ "message": message.get("message").cloned().unwrap_or(Value::Null) }),
));
if maybe_handle_provider_upgrade_schedule_status(
state,
&provider_id,
task_id,
status,
&stored_result,
)
.await?
{
return Ok(());
}
let client = state.pool.get().await?;
client.execute(
r#"
@@ -339,3 +867,122 @@ pub async fn mark_stale_providers_offline(state: &Arc<AppState>) -> anyhow::Resu
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
fn dt(value: &str) -> DateTime<Utc> {
DateTime::parse_from_rfc3339(value)
.expect("valid fixture timestamp")
.with_timezone(&Utc)
}
fn expectation() -> ProviderUpgradeFinalHeartbeatExpectation {
ProviderUpgradeFinalHeartbeatExpectation {
task_id: "task_upgrade_contract".to_string(),
provider_id: "D601".to_string(),
target_version: "0.2.27".to_string(),
min_heartbeat_at: dt("2026-05-24T10:00:00Z"),
deadline_at: dt("2026-05-24T10:04:00Z"),
scheduler_result: json!({
"mode": "schedule",
"providerGatewayVersion": "0.2.26",
"targetProviderGatewayVersion": "0.2.27",
"plan": {
"candidateName": "unidesk-provider-gateway-D601-candidate-task-upgrade-contract"
}
}),
candidate_name: Some(
"unidesk-provider-gateway-D601-candidate-task-upgrade-contract".to_string(),
),
last_known_good_version: Some("0.2.26".to_string()),
}
}
#[test]
fn candidate_validation_heartbeat_is_not_terminal_success() {
let snapshot = ProviderUpgradeNodeSnapshot {
provider_id: "D601".to_string(),
status: "online".to_string(),
labels: json!({
"providerGatewayVersion": "0.2.27",
"providerGatewayContainerId": "candidate-container-id",
"providerGatewayContainerName": "unidesk-provider-gateway-D601-candidate-task-upgrade-contract",
"providerGatewayRestartPolicy": "no",
"providerGatewayPidMode": "host",
"providerGatewayHeartbeatAt": "2026-05-24T10:01:00Z"
}),
};
let decision = evaluate_provider_upgrade_final_heartbeat(
&expectation(),
Some(&snapshot),
dt("2026-05-24T10:05:00Z"),
);
let ProviderUpgradeFinalHeartbeatDecision::Failed(result) = decision else {
panic!("candidate heartbeat must fail after final promoted heartbeat timeout");
};
assert_eq!(
result.get("reason").and_then(Value::as_str),
Some("final-promoted-heartbeat-timeout")
);
assert_eq!(
result
.get("candidateValidationWasTerminal")
.and_then(Value::as_bool),
Some(false)
);
assert_eq!(
result
.pointer("/rollback/preferred")
.and_then(Value::as_str),
Some("last-known-good")
);
assert_eq!(
result
.pointer("/finalPromotedContainer/containerIdUnavailableReason")
.and_then(Value::as_str),
Some("latest heartbeat came from candidate validation container, not final promoted compose service")
);
}
#[test]
fn final_promoted_compose_heartbeat_is_success() {
let snapshot = ProviderUpgradeNodeSnapshot {
provider_id: "D601".to_string(),
status: "online".to_string(),
labels: json!({
"providerGatewayVersion": "0.2.27",
"providerGatewayContainerId": "final-container-id",
"providerGatewayContainerName": "unidesk-provider-gateway-D601",
"providerGatewayRestartPolicy": "always",
"providerGatewayPidMode": "host",
"providerGatewayHeartbeatAt": "2026-05-24T10:02:00Z"
}),
};
let decision = evaluate_provider_upgrade_final_heartbeat(
&expectation(),
Some(&snapshot),
dt("2026-05-24T10:02:01Z"),
);
let ProviderUpgradeFinalHeartbeatDecision::Succeeded(result) = decision else {
panic!("final compose heartbeat should succeed");
};
assert_eq!(
result.get("reason").and_then(Value::as_str),
Some("final-promoted-heartbeat-confirmed")
);
assert_eq!(
result
.pointer("/finalPromotedContainer/containerId")
.and_then(Value::as_str),
Some("final-container-id")
);
assert_eq!(
result
.pointer("/finalPromotedContainer/heartbeatTimestamp")
.and_then(Value::as_str),
Some("2026-05-24T10:02:00.000Z")
);
}
}
+3 -1
View File
@@ -1,4 +1,4 @@
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::str::FromStr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
@@ -51,6 +51,7 @@ pub struct AppState {
pub request_performance_samples: Mutex<Vec<RequestPerformanceSample>>,
pub operation_performance_samples: Mutex<Vec<OperationPerformanceSample>>,
pub active_scheduled_runs: Mutex<std::collections::HashSet<String>>,
pub active_provider_upgrade_watchers: Mutex<HashSet<String>>,
}
impl AppState {
@@ -84,6 +85,7 @@ impl AppState {
request_performance_samples: Mutex::new(Vec::new()),
operation_performance_samples: Mutex::new(Vec::new()),
active_scheduled_runs: Mutex::new(std::collections::HashSet::new()),
active_provider_upgrade_watchers: Mutex::new(HashSet::new()),
})
}
@@ -3,8 +3,11 @@ FROM ${CODE_QUEUE_BASE_IMAGE}
ENV PLAYWRIGHT_BROWSERS_PATH=/ms-playwright
ENV UNIDESK_SKILLS_PATH=/root/.agents/skills
ENV RUSTUP_HOME=/usr/local/rustup
ENV CARGO_HOME=/usr/local/cargo
ENV PATH=/usr/local/cargo/bin:${PATH}
RUN (command -v codex >/dev/null 2>&1 && command -v opencode >/dev/null 2>&1 && command -v docker >/dev/null 2>&1 && command -v gh >/dev/null 2>&1 && command -v rg >/dev/null 2>&1 && command -v cargo >/dev/null 2>&1 && command -v rustc >/dev/null 2>&1 && command -v rustfmt >/dev/null 2>&1 && command -v xvfb-run >/dev/null 2>&1 && command -v xauth >/dev/null 2>&1 && test -x "$PLAYWRIGHT_BROWSERS_PATH/chromium_headless_shell-1217/chrome-headless-shell-linux64/chrome-headless-shell") \
RUN (command -v codex >/dev/null 2>&1 && command -v opencode >/dev/null 2>&1 && command -v docker >/dev/null 2>&1 && command -v gh >/dev/null 2>&1 && command -v rg >/dev/null 2>&1 && command -v cargo >/dev/null 2>&1 && command -v rustc >/dev/null 2>&1 && rustc --version | awk '{ split($2, v, "."); exit ! (v[1] > 1 || (v[1] == 1 && v[2] >= 86)) }' && command -v rustfmt >/dev/null 2>&1 && command -v xvfb-run >/dev/null 2>&1 && command -v xauth >/dev/null 2>&1 && test -x "$PLAYWRIGHT_BROWSERS_PATH/chromium_headless_shell-1217/chrome-headless-shell-linux64/chrome-headless-shell") \
|| (apt-get update \
&& apt-get install -y --no-install-recommends \
bash \
@@ -42,6 +45,8 @@ RUN (command -v codex >/dev/null 2>&1 && command -v opencode >/dev/null 2>&1 &&
xauth \
xvfb \
xz-utils \
&& curl https://sh.rustup.rs -sSf | sh -s -- -y --profile minimal --default-toolchain stable \
&& rustup component add rustfmt \
&& mkdir -p /usr/local/lib/docker/cli-plugins /root/.docker/cli-plugins \
&& ln -sf /usr/bin/docker-compose /usr/local/lib/docker/cli-plugins/docker-compose \
&& ln -sf /usr/bin/docker-compose /root/.docker/cli-plugins/docker-compose \
+1 -1
View File
@@ -1,6 +1,6 @@
{
"name": "@unidesk/provider-gateway",
"version": "0.2.26",
"version": "0.2.27",
"private": true,
"type": "module",
"scripts": {
+101 -13
View File
@@ -1671,7 +1671,24 @@ function upgradePlan(taskId: string): Record<string, JsonValue> {
const validationNeedleAck = `"requestId":"register"`;
const validationNeedleOk = `"ok":true`;
const validationAttempts = Math.max(1, Math.ceil(validationTimeoutMs / 2000));
const finalPromotedReconnectTimeoutMs = 180_000;
const finalPromotedHeartbeatTimeoutMs = 240_000;
const finalValidationAttempts = Math.max(1, Math.ceil(finalPromotedReconnectTimeoutMs / 2000));
const targetGatewayMetadata = readTargetGatewayMetadata(workspace);
const unavailableFinalPromotedContainer = {
containerId: null,
containerIdUnavailableReason: "mode=plan or schedule has not yet observed backend-core final promoted heartbeat",
containerName: null,
containerNameUnavailableReason: "mode=plan or schedule has not yet observed backend-core final promoted heartbeat",
version: null,
versionUnavailableReason: "mode=plan or schedule has not yet observed backend-core final promoted heartbeat",
restartPolicy: null,
restartPolicyUnavailableReason: "mode=plan or schedule has not yet observed backend-core final promoted heartbeat",
pidMode: null,
pidModeUnavailableReason: "mode=plan or schedule has not yet observed backend-core final promoted heartbeat",
heartbeatTimestamp: null,
heartbeatTimestampUnavailableReason: "mode=plan or schedule has not yet observed backend-core final promoted heartbeat",
};
const compactBuildContextRefreshScript = [
`provider_src=${shellQuote(`${workspaceRoot}/src/components/provider-gateway`)}`,
`shared_src=${shellQuote(`${workspaceRoot}/src/components/shared`)}`,
@@ -1693,6 +1710,33 @@ function upgradePlan(taskId: string): Record<string, JsonValue> {
` fi`,
`done`,
].join("\n");
const rollbackToLastKnownGoodScript = [
`rollback_to_last_known_good() {`,
` reason="$1"`,
` echo "rollback_to_last_known_good: $reason" >&2`,
` if [ -n "$final_container" ]; then docker rm -f "$final_container" >/dev/null 2>&1 || true; fi`,
` docker rm -f ${shellQuote(candidateName)} >/dev/null 2>&1 || true`,
` if [ -z "$old_name" ] || [ -z "$old_image" ]; then echo "rollback_to_last_known_good unavailable: old_name=$old_name old_image=$old_image" >&2; return 1; fi`,
` docker rm -f "$old_name" >/dev/null 2>&1 || true`,
[
` docker run -d`,
`--name "$old_name"`,
`--restart always`,
`--pid "\${old_pid_mode:-host}"`,
`$network_arg`,
`--env-file "$candidate_env_file"`,
`--label ${shellQuote(`com.docker.compose.project=${config.upgradeComposeProject}`)}`,
`--label ${shellQuote(`com.docker.compose.service=${config.upgradeService}`)}`,
`--label ${shellQuote("com.docker.compose.oneoff=False")}`,
`--label ${shellQuote(`unidesk.upgrade.rollback=${taskId}`)}`,
`$mount_args`,
`$extra_host_args`,
`"$old_image" >/dev/null || return 1`,
].join(" "),
` if [ -n "$network_names" ]; then for net in $network_names; do if [ "$net" != "$first_network" ]; then docker network connect "$net" "$old_name" >/dev/null 2>&1 || true; fi; done; fi`,
` echo "rollback_to_last_known_good restored old gateway container name=$old_name image=$old_image reason=$reason" >&2`,
`}`,
].join("\n");
const script = [
"set -eu",
`cd ${shellQuote(workspace)}`,
@@ -1701,16 +1745,24 @@ function upgradePlan(taskId: string): Record<string, JsonValue> {
`first_old=""`,
`for old_id in $old_ids; do first_old="$old_id"; break; done`,
`old_name=""`,
`old_image=""`,
`old_restart=""`,
`old_pid_mode=""`,
`final_container=""`,
`mount_args=""`,
`extra_host_args=""`,
`candidate_env_file=${shellQuote(`/tmp/${candidateName}.env`)}`,
`network_names=""`,
`first_network=""`,
`network_arg=""`,
rollbackToLastKnownGoodScript,
`if [ -n "$old_ids" ]; then docker update --restart always $old_ids >/dev/null 2>&1 || true; fi`,
`if [ -z "$first_old" ]; then echo "no existing provider-gateway compose container found; cannot perform safe in-place upgrade" >&2; exit 1; fi`,
`old_name=$(docker inspect --format '{{.Name}}' "$first_old")`,
`old_name="\${old_name#/}"`,
`old_image=$(docker inspect --format '{{.Image}}' "$first_old")`,
`old_restart=$(docker inspect --format '{{.HostConfig.RestartPolicy.Name}}' "$first_old")`,
`old_pid_mode=$(docker inspect --format '{{.HostConfig.PidMode}}' "$first_old")`,
`docker inspect --format '{{range .Config.Env}}{{println .}}{{end}}' "$first_old" > "$candidate_env_file"`,
`mount_args=$(docker inspect --format '{{range .Mounts}}{{printf "-v %s:%s" .Source .Destination}}{{if not .RW}}{{printf ":ro"}}{{end}}{{printf "\\n"}}{{end}}' "$first_old")`,
`extra_host_args=$(docker inspect --format '{{range .HostConfig.ExtraHosts}}{{printf "--add-host %s\\n" .}}{{end}}' "$first_old")`,
@@ -1745,12 +1797,18 @@ function upgradePlan(taskId: string): Record<string, JsonValue> {
`docker rm -f ${shellQuote(candidateName)} >/dev/null 2>&1 || true`,
composeUpCommand.map(shellQuote).join(" "),
`final_container="$old_name"`,
`if [ -z "$final_container" ]; then final_container=$(${listServiceContainersCommand.map(shellQuote).join(" ")} | head -n 1); fi`,
`if [ -z "$final_container" ] || ! docker inspect "$final_container" >/dev/null 2>&1; then final_container=$(${listServiceContainersCommand.map(shellQuote).join(" ")} | head -n 1); fi`,
`if [ -z "$final_container" ]; then echo "final provider-gateway compose container not found after promotion" >&2; rollback_to_last_known_good "final-compose-container-missing" || true; rm -f "$candidate_env_file"; exit 1; fi`,
`final_container_id=$(docker inspect --format '{{.Id}}' "$final_container")`,
`final_restart=$(docker inspect --format '{{.HostConfig.RestartPolicy.Name}}' "$final_container")`,
`final_pid_mode=$(docker inspect --format '{{.HostConfig.PidMode}}' "$final_container")`,
`if [ "$final_restart" != "always" ] || [ "$final_pid_mode" != "host" ]; then echo "final provider-gateway runtime guard failed: restart=$final_restart pid=$final_pid_mode" >&2; exit 1; fi`,
`if [ "$final_restart" != "always" ] || [ "$final_pid_mode" != "host" ]; then echo "final provider-gateway runtime guard failed: restart=$final_restart pid=$final_pid_mode" >&2; rollback_to_last_known_good "final-runtime-guard-failed" || true; rm -f "$candidate_env_file"; exit 1; fi`,
`final_attempt=0`,
`final_validated=0`,
`while [ "$final_attempt" -lt "${finalValidationAttempts}" ]; do final_logs=$(docker logs "$final_container" 2>&1 || true); final_has_open=0; final_has_ack=0; final_has_ok=0; case "$final_logs" in *${shellQuote(validationNeedleOpen)}*) final_has_open=1;; esac; case "$final_logs" in *${shellQuote(validationNeedleAck)}*) final_has_ack=1;; esac; case "$final_logs" in *${shellQuote(validationNeedleOk)}*) final_has_ok=1;; esac; if [ "$final_has_open" = "1" ] && [ "$final_has_ack" = "1" ] && [ "$final_has_ok" = "1" ]; then final_validated=1; break; fi; final_running=$(docker inspect --format '{{.State.Running}}' "$final_container" 2>/dev/null || true); if [ "$final_running" != "true" ]; then break; fi; final_attempt=$((final_attempt + 1)); sleep 2; done`,
`if [ "$final_validated" != "1" ]; then echo "final provider-gateway compose container did not reconnect to backend-core before local guard timeout; backend-core must not treat candidate validation as success" >&2; docker logs "$final_container" >&2 || true; rollback_to_last_known_good "final-promoted-container-reconnect-timeout" || true; rm -f "$candidate_env_file"; exit 1; fi`,
`rm -f "$candidate_env_file"`,
`echo "candidate provider-gateway validated and promoted"`,
`echo "candidate provider-gateway validated; final promoted compose container reconnected locally: container=$final_container id=$final_container_id restart=$final_restart pid=$final_pid_mode"`,
].join("\n");
const dockerRunCommand = [
"docker",
@@ -1805,16 +1863,23 @@ function upgradePlan(taskId: string): Record<string, JsonValue> {
buildBeforeCandidate: true,
oldGatewaySleepMs: sleepMs,
validationTimeoutMs,
finalPromotedReconnectTimeoutMs,
finalPromotedHeartbeatTimeoutMs,
oldGatewayRestartPolicyBeforeSleep: "always",
promoteOnlyAfterCandidateValidation: true,
candidateRestartPolicyAfterPromotion: "always",
candidateFinalRestartPolicyValidation: true,
candidateUsesOldContainerMounts: true,
candidateUsesOldContainerNetworks: true,
candidateUsesOldContainerExtraHosts: true,
candidateUsesOldContainerEnvironment: true,
candidateUsesHostPidNamespace: true,
startupSelfHealsRestartPolicy: true,
candidateValidationIsNotTerminalSuccess: true,
finalPromotedComposeHeartbeatRequired: true,
finalPromotedContainerMustNotBeCandidate: true,
localRollbackToLastKnownGoodOnFinalReconnectFailure: true,
backendCoreOwnsFinalTerminalStatus: true,
candidateRestartPolicyAfterPromotion: "always",
candidateFinalRestartPolicyValidation: true,
candidateUsesOldContainerMounts: true,
candidateUsesOldContainerNetworks: true,
candidateUsesOldContainerExtraHosts: true,
candidateUsesOldContainerEnvironment: true,
candidateUsesHostPidNamespace: true,
startupSelfHealsRestartPolicy: true,
dockerStatusReportsRestartPolicyAndPidMode: true,
refreshesCompactBuildContextBeforeBuild: true,
removeScope: {
@@ -1825,6 +1890,17 @@ function upgradePlan(taskId: string): Record<string, JsonValue> {
namedVolumesPreserved: true,
},
dockerRunCommand,
finalPromotedHeartbeatConfirmation: {
required: true,
providerId: config.providerId,
targetProviderGatewayVersion: targetGatewayMetadata.version,
backendCoreNodeRegistry: "/api/nodes",
heartbeatTimeoutMs: finalPromotedHeartbeatTimeoutMs,
finalContainerMustNotEqualCandidate: candidateName,
requiredResultFields: ["containerId", "version", "restartPolicy", "pidMode", "heartbeatTimestamp"],
unavailableResult: unavailableFinalPromotedContainer,
},
finalPromotedContainer: unavailableFinalPromotedContainer,
compactBuildContextCandidates,
workspaceStateMount: `${config.upgradeHostProjectRoot.replace(/\/+$/, "")}/.state:${workspaceRoot}/.state`,
};
@@ -1843,12 +1919,16 @@ async function runProviderUpgrade(taskId: string, payload: Record<string, JsonVa
setTimeout(() => enterUpgradeSleep(sleepMs), 250).unref?.();
return {
mode,
message: "provider gateway upgrade scheduled with sleep-and-validate rollback guard",
message: "provider gateway upgrade scheduled; backend-core must wait for final promoted compose heartbeat before terminal success",
providerId: config.providerId,
providerName: config.providerName,
providerGatewayVersion: gatewayMetadata.version,
targetProviderGatewayVersion: (plan.targetProviderGatewayVersion as string | undefined) ?? gatewayMetadata.version,
updaterContainerId: result.stdout.trim(),
terminalStatusManagedByBackendCore: true,
candidateValidationIsNotTerminalSuccess: true,
finalPromotedHeartbeatConfirmation: plan.finalPromotedHeartbeatConfirmation,
finalPromotedContainer: plan.finalPromotedContainer,
plan,
stderr: result.stderr.slice(0, 500),
};
@@ -2145,7 +2225,15 @@ async function handleDispatch(message: CoreDispatchMessage): Promise<void> {
}
if (message.command === "provider.upgrade") {
const result = await runProviderUpgrade(message.taskId, message.payload);
await sendTaskStatus(message.taskId, "succeeded", "provider upgrade command completed", result);
const scheduleMode = message.payload.mode === "schedule";
await sendTaskStatus(
message.taskId,
scheduleMode ? "running" : "succeeded",
scheduleMode
? "provider upgrade scheduled; awaiting backend-core final promoted heartbeat"
: "provider upgrade command completed",
result,
);
return;
}
if (message.command === "host.ssh") {