diff --git a/docs/reference/provider-gateway.md b/docs/reference/provider-gateway.md index 91a08784..aaeeaf72 100644 --- a/docs/reference/provider-gateway.md +++ b/docs/reference/provider-gateway.md @@ -10,7 +10,7 @@ Provider Gateway 是计算节点侧容器。它只主动连出到主 server 暴 计算节点 `provider-gateway` 容器的重建和升级权威路径是 `provider.upgrade` 的 `mode: "schedule"`,或 frontend 中等价的显式升级调度。该路径由在线 provider 通过本地 Docker socket 启动 detached updater 容器,让升级动作脱离当前 WebSocket 与 SSH 透传会话的生命周期;重建目标只能是 `provider-gateway` service,并且必须带 `--no-deps` 与 `--force-recreate`,不得牵连 database、backend-core、frontend 或业务用户服务,也不得因为镜像 tag 未变而 no-op。 -远程升级必须采用 sleep-and-validate 回滚保护:旧 gateway 在成功调度 updater 后关闭当前 WebSocket 并进入最长 5 分钟的助眠期;updater 先构建新镜像,再用旧容器的环境变量、挂载、网络和 `extra_hosts` 拉起候选 gateway;候选 gateway 必须在日志中出现 `connect_open` 和 register ack 成功,才允许删除旧 service 容器和候选容器,并用原 Compose service 重新创建最终 provider-gateway 容器。最终容器必须重新验证 `restart=always` 与 `pid=host`,避免候选临时容器命名或端口映射残留成第二条运行路径。候选验证失败时 updater 必须删除候选容器并退出失败,旧 gateway 到达助眠上限后自动重连主 server,形成自动回滚。backend-core 必须在同一 Provider ID 被新 WebSocket 替换后忽略旧 WebSocket 的 close 事件,避免候选已上线后又被旧连接关闭标记为 offline。 +远程升级必须采用 sleep-and-validate 回滚保护:旧 gateway 在成功调度 updater 后关闭当前 WebSocket 并进入最长 5 分钟的助眠期;updater 先构建新镜像,再用旧容器的环境变量、挂载、网络和 `extra_hosts` 拉起候选 gateway;候选 gateway 必须在日志中出现 `connect_open` 和 register ack 成功,才允许删除旧 service 容器和候选容器,并用原 Compose service 重新创建最终 provider-gateway 容器。候选验证成功不是升级终态;最终 Compose 容器必须本地重新验证 `restart=always`、`pid=host` 和 backend-core reconnect,并且 backend-core 还必须看到同一 Provider ID、目标 `providerGatewayVersion`、非 candidate 容器名、足够新的真实 heartbeat(`providerGatewayHeartbeatAt`)后,`provider.upgrade` 任务才能进入 `succeeded`。最终容器未在窗口内重连时,updater 应优先恢复 last-known-good 容器,backend-core 必须把任务写成结构化 failure/blocker,不能把 candidate 曾经成功当作最终成功。backend-core 必须在同一 Provider ID 被新 WebSocket 替换后忽略旧 WebSocket 的 close 事件,避免候选已上线后又被旧连接关闭标记为 offline。 禁止通过 UniDesk 自己的 Host SSH / WSL SSH 透传同步执行 `docker compose up -d --build provider-gateway`、`docker compose restart provider-gateway`、`docker rm -f ` 后再启动等自重建命令。原因是这条 SSH 透传连接正由被重建的旧 `provider-gateway` 容器承载;旧容器停止后会切断控制通道,可能把节点留在旧容器已停、新容器未起的不可达状态。SSH 透传只允许用于诊断、修复升级前置条件、查看本地状态和升级后验证,不允许作为计算节点 `provider-gateway` 正式重建/升级通道。 @@ -128,11 +128,13 @@ provider-gateway 连接成功后必须周期性上报节点 CPU、内存、硬 backend-core 可以通过真实 WebSocket 调度向在线 provider 下发 `provider.upgrade`。`mode: "plan"` 只返回升级计划,用于 E2E 和人工预检;`mode: "schedule"` 会要求 provider-gateway 通过本地 Docker socket 启动一个 detached updater 容器。updater 的固定策略是先执行 `docker compose build provider-gateway`,构建成功后只按 `com.docker.compose.project` 与 `com.docker.compose.service=provider-gateway` label 删除旧 provider-gateway 容器,最后执行 `docker compose up -d --no-deps --force-recreate provider-gateway`。`--no-deps` 是强制要求,`--force-recreate` 用于保证 provider 重新注册能力标签并避免 compose no-op;升级 provider-gateway 时不得重建或停止 database、backend-core、frontend。对 D518、D601 这类计算节点,`mode: "schedule"` 是正式重建/升级 `provider-gateway` 容器的唯一标准路径;升级执行路径使用 Docker socket 和只读仓库挂载,不使用 Host SSH 维护桥作为自动调度通道。 -使用 compact build context 的节点必须把 context 视为派生缓存,不得让它成为版本真相。D601 这类节点可能让 Compose 从 `.state/provider--build-context` 构建兜底镜像,以便复用本地基础镜像和缩小构建输入;`provider.upgrade mode=schedule` 在 `docker compose build` 前必须从只读 `/workspace/src/components/provider-gateway` 与 `/workspace/src/components/shared` 刷新该 context,并把宿主 `.state` 以可写方式只挂给 detached updater。验收不能只看 updater 日志中的 `Built` 或 `candidate provider-gateway validated and promoted`,必须以主 server 可观测的 `providerGatewayVersion`、`unideskCapabilities` 和目标能力字段为准;如果线上仍上报旧版本,说明运行镜像没有真正更新,必须先修复构建 context 再重跑标准升级。 +使用 compact build context 的节点必须把 context 视为派生缓存,不得让它成为版本真相。D601 这类节点可能让 Compose 从 `.state/provider--build-context` 构建兜底镜像,以便复用本地基础镜像和缩小构建输入;`provider.upgrade mode=schedule` 在 `docker compose build` 前必须从只读 `/workspace/src/components/provider-gateway` 与 `/workspace/src/components/shared` 刷新该 context,并把宿主 `.state` 以可写方式只挂给 detached updater。验收不能只看 updater 日志中的 `Built`、candidate reconnect 或本地 promote 文本,必须以 backend-core 任务结果中最终非 candidate Compose 容器的 `providerGatewayVersion`、`unideskCapabilities`、`containerId`、`restartPolicy`、`pidMode` 和 `heartbeatTimestamp` 为准;如果线上仍上报旧版本,说明运行镜像没有真正更新,必须先修复构建 context 再重跑标准升级。 远程升级策略固定为 always-enabled:只要 provider-gateway 在线并声明 `provider.upgrade`,`mode: "schedule"` 就必须真正调度升级容器,不允许被 `PROVIDER_UPGRADE_ENABLED=false`、前端隐藏按钮或服务端特殊名单禁用。升级能力的安全边界不是开关,而是显式 `PROVIDER_UPGRADE_*` 配置、Docker socket 权限、只读仓库挂载、固定 Compose service 和 `--no-deps` 约束。升级计划中必须展示 `policy: "always-enabled"`、updater 容器名、runner image、workspace、Compose project/service、env file、compose file 和实际 `docker run` 命令,方便前端任务历史与 CLI debug 直接诊断。 -`mode: "schedule"` 的成功返回只代表 updater 已被调度,最终升级成败由候选 gateway 自验证决定。updater 必须先按 Compose 构建新镜像,再用旧容器的 `Config.Env` 生成候选 env-file,并复用旧容器的 Docker socket、日志目录、SSH 私钥只读挂载、Compose 网络和 `extra_hosts`;候选容器启动时 restart policy 必须先是 `no`,并显式使用 `--pid host` 保持节点级进程资源采集,验证通过后才能改成 `always` 并删除旧容器。升级计划的 `replacementStrategy` 必须包含 `oldGatewaySleepMs`、`validationTimeoutMs`、`promoteOnlyAfterCandidateValidation`、`candidateRestartPolicyAfterPromotion: "always"`、`candidateUsesOldContainerEnvironment`、`candidateUsesOldContainerMounts`、`candidateUsesOldContainerNetworks`、`candidateUsesOldContainerExtraHosts` 和 `candidateUsesHostPidNamespace`,并且必须在 plan 中显示指定 Provider 的当前/目标 gateway 版本号,便于前端和 CLI 判断这不是旧的先删旧容器再 up 的危险流程。 +`mode: "schedule"` 调度成功后,provider-gateway 只能把任务推进到 `running`,不能返回终态成功;backend-core 必须接管最终终态判定。updater 必须先按 Compose 构建新镜像,再用旧容器的 `Config.Env` 生成候选 env-file,并复用旧容器的 Docker socket、日志目录、SSH 私钥只读挂载、Compose 网络和 `extra_hosts`;候选容器启动时 restart policy 必须先是 `no`,并显式使用 `--pid host` 保持节点级进程资源采集。candidate 通过日志级 register ack 后仍只是 promotion 前置条件;最终成功必须等 `docker compose up -d --no-deps --force-recreate provider-gateway` 创建出的非 candidate Compose 容器向 backend-core 上报目标版本 heartbeat。升级计划的 `replacementStrategy` 必须包含 `oldGatewaySleepMs`、`validationTimeoutMs`、`finalPromotedReconnectTimeoutMs`、`finalPromotedHeartbeatTimeoutMs`、`promoteOnlyAfterCandidateValidation`、`candidateValidationIsNotTerminalSuccess`、`finalPromotedComposeHeartbeatRequired`、`candidateUsesOldContainerEnvironment`、`candidateUsesOldContainerMounts`、`candidateUsesOldContainerNetworks`、`candidateUsesOldContainerExtraHosts` 和 `candidateUsesHostPidNamespace`,并且必须在 plan 中显示指定 Provider 的当前/目标 gateway 版本号,便于前端和 CLI 判断这不是旧的先删旧容器再 up 的危险流程。 + +`provider.upgrade` 的终态 result 必须包含最终 Compose 容器的 `containerId`、`version`、`restartPolicy`、`pidMode` 和 `heartbeatTimestamp`;字段不可得时必须给出稳定的 `*UnavailableReason`,不能省略或用空字符串伪装成功。若 backend-core 在窗口内只观察到 candidate 容器 heartbeat、旧版本 heartbeat、离线状态或缺失 heartbeat,任务必须 `failed`,`reason` 使用结构化 blocker 语义,并包含 last-known-good/rollback 语义,提示以旧容器恢复或回滚为优先处置。 远程更新记录的权威来源是 backend-core 保存的 `provider.upgrade` 任务历史,而不是 provider-gateway 容器日志文件。frontend 必须按 Provider 聚合这些任务,并把状态、模式、task id、来源、耗时、策略、updater 容器摘要、失败原因和更新时间渲染为表格或卡片;完整 task/result JSON 只能由操作员点击 `查看原始JSON` 后查看。 @@ -152,7 +154,7 @@ D601 这类长期 WSL provider 不得因为单一路径失败被直接写成全 如果节点已有专用 Compose,优先用节点本地 Compose 手动重建一次:`docker compose --env-file .state/provider-.env -f -p up -d --no-deps --build --force-recreate provider-gateway`。这条命令必须在节点本地终端、节点自有 Web terminal、系统计划任务或 detached shell 中执行;不得通过正在被重建的 UniDesk provider-gateway 自己提供的 SSH 透传同步执行,否则旧 provider 容器停止时会切断 SSH client,可能导致重建中断在旧容器已停、新容器未起的状态。若只能通过 UniDesk 触达该节点,必须使用 `provider.upgrade mode=schedule` 的 detached updater,或先用节点本地 `nohup`/systemd 启动一个不依赖当前 provider 容器生命周期的重建脚本。老版 `docker-compose` 可能在重建已存在容器时因为 `ContainerConfig` 兼容问题失败;此时只能移除目标 provider-gateway 容器后重新 `up -d --no-deps provider-gateway`,不得执行 `down -v`、`docker volume rm` 或任何会影响 database 命名卷的命令。如果节点当前只有 `docker run` 部署,则先构建镜像 `docker build -f src/components/provider-gateway/Dockerfile -t unidesk_provider-gateway: .`,再以固定容器名重建:使用 `--restart always --pid host`,挂载 `/var/run/docker.sock:/var/run/docker.sock`、`/home/ubuntu/unidesk:/workspace:ro`、节点日志目录到 `/var/log/unidesk`,如需 WSL SSH 维护桥还要把只读私钥目录挂载到 `/run/host-ssh`,并使用同一个 `.state/provider-.env` 启动。无论 Compose 还是 `docker run`,容器名和镜像 tag 都必须带 Provider ID,便于 Docker 状态页、进程资源表、任务历史和节点本地排障互相对应。 -手动升级完成后的判定标准固定为主 server 可观测结果,而不是节点容器 `running`:访问公网 frontend `http://74.48.78.17:18081/`,确认该 Provider 在线;随后在任意装有本仓库且 `config.json` 含正确 frontend 登录凭据的计算节点上执行 `bun scripts/cli.ts --main-server-ip 74.48.78.17 debug dispatch provider.upgrade --mode schedule --wait-ms 15000`,确认任务 `succeeded` 且 result 包含 updater 容器信息;最后再次查看 frontend 或执行 `bun scripts/cli.ts --main-server-ip 74.48.78.17 debug health`,确认节点重连、指标恢复、labels 中 `host.ssh` 能力存在。每个 provider-gateway 手动升级后都必须用 remote CLI 再执行 `bun scripts/cli.ts --main-server-ip 74.48.78.17 debug dispatch host.ssh --wait-ms 15000` 和 `bun scripts/cli.ts --main-server-ip 74.48.78.17 ssh hostname`,验证维护桥没有在升级后丢失;该 remote CLI 默认走公网 frontend,不需要指定 `--main-server-key`。 +手动升级完成后的判定标准固定为主 server 可观测结果,而不是节点容器 `running`:访问公网 frontend `http://74.48.78.17:18081/`,确认该 Provider 在线;随后在任意装有本仓库且 `config.json` 含正确 frontend 登录凭据的计算节点上执行 `bun scripts/cli.ts --main-server-ip 74.48.78.17 debug dispatch provider.upgrade --mode schedule --wait-ms 300000`,确认任务 `succeeded` 且 result 包含最终 Compose 容器 `containerId`、目标 gateway `version`、`restartPolicy=always`、`pidMode=host` 和新的 `heartbeatTimestamp`;最后再次查看 frontend 或执行 `bun scripts/cli.ts --main-server-ip 74.48.78.17 debug health`,确认节点重连、指标恢复、labels 中 `host.ssh` 能力存在。每个 provider-gateway 手动升级后都必须用 remote CLI 再执行 `bun scripts/cli.ts --main-server-ip 74.48.78.17 debug dispatch host.ssh --wait-ms 15000` 和 `bun scripts/cli.ts --main-server-ip 74.48.78.17 ssh hostname`,验证维护桥没有在升级后丢失;该 remote CLI 默认走公网 frontend,不需要指定 `--main-server-key`。 ## Host SSH Maintenance Bridge diff --git a/src/components/backend-core/src/provider_registry.rs b/src/components/backend-core/src/provider_registry.rs index e2afe3e0..a4fdb07a 100644 --- a/src/components/backend-core/src/provider_registry.rs +++ b/src/components/backend-core/src/provider_registry.rs @@ -3,12 +3,13 @@ use std::sync::Arc; use axum::extract::ws::{Message, WebSocket, WebSocketUpgrade}; use axum::http::{HeaderMap, Uri}; use axum::response::Response; +use chrono::{DateTime, Duration, Utc}; use futures_util::{SinkExt, StreamExt}; use serde_json::{json, Value}; use tokio::sync::mpsc; use crate::db::{ - record_event, update_provider_heartbeat, upsert_docker_status, upsert_provider_node, + raw_task, record_event, update_provider_heartbeat, upsert_docker_status, upsert_provider_node, upsert_system_status, }; use crate::egress_tcp::{handle_egress_tcp_close, handle_egress_tcp_data, handle_egress_tcp_open}; @@ -17,6 +18,513 @@ use crate::json_util::compact_json_for_storage; use crate::ssh_bridge::forward_ssh_provider_message; use crate::state::{AppState, HttpTunnelResponse, ProviderConnection}; use crate::task_dispatcher::{is_terminal_task_status, notify_task_terminal}; +use crate::types::RawTaskRow; + +const PROVIDER_UPGRADE_FINAL_HEARTBEAT_TIMEOUT_MS: i64 = 240_000; +const PROVIDER_UPGRADE_FINAL_HEARTBEAT_POLL_MS: u64 = 2_000; + +#[derive(Clone, Debug)] +struct ProviderUpgradeFinalHeartbeatExpectation { + task_id: String, + provider_id: String, + target_version: String, + min_heartbeat_at: DateTime, + deadline_at: DateTime, + scheduler_result: Value, + candidate_name: Option, + last_known_good_version: Option, +} + +#[derive(Clone, Debug)] +struct ProviderUpgradeNodeSnapshot { + provider_id: String, + status: String, + labels: Value, +} + +#[derive(Clone, Debug)] +enum ProviderUpgradeFinalHeartbeatDecision { + Pending(Value), + Succeeded(Value), + Failed(Value), +} + +fn json_string_at(value: &Value, path: &[&str]) -> Option { + let mut current = value; + for part in path { + current = current.get(*part)?; + } + current + .as_str() + .filter(|text| !text.is_empty()) + .map(ToOwned::to_owned) +} + +fn provider_upgrade_label_string(labels: &Value, key: &str) -> Option { + labels + .get(key) + .and_then(Value::as_str) + .filter(|value| !value.is_empty()) + .map(ToOwned::to_owned) +} + +fn provider_upgrade_heartbeat_at(snapshot: &ProviderUpgradeNodeSnapshot) -> Option> { + provider_upgrade_label_string(&snapshot.labels, "providerGatewayHeartbeatAt").and_then( + |value| { + DateTime::parse_from_rfc3339(&value) + .ok() + .map(|parsed| parsed.with_timezone(&Utc)) + }, + ) +} + +fn provider_upgrade_schedule_task(task: &RawTaskRow) -> bool { + task.command == "provider.upgrade" + && task.payload.get("mode").and_then(Value::as_str) == Some("schedule") +} + +fn provider_upgrade_target_version(task: &RawTaskRow, scheduler_result: &Value) -> Option { + json_string_at(scheduler_result, &["targetProviderGatewayVersion"]) + .or_else(|| json_string_at(scheduler_result, &["plan", "targetProviderGatewayVersion"])) + .or_else(|| json_string_at(&task.payload, &["targetProviderGatewayVersion"])) +} + +fn provider_upgrade_last_known_good_version(scheduler_result: &Value) -> Option { + json_string_at(scheduler_result, &["providerGatewayVersion"]) + .or_else(|| json_string_at(scheduler_result, &["plan", "providerGatewayVersion"])) +} + +fn provider_upgrade_candidate_name(scheduler_result: &Value) -> Option { + json_string_at(scheduler_result, &["candidateName"]) + .or_else(|| json_string_at(scheduler_result, &["plan", "candidateName"])) +} + +fn provider_upgrade_expectation( + task: &RawTaskRow, + scheduler_result: &Value, + now: DateTime, +) -> Option { + let target_version = provider_upgrade_target_version(task, scheduler_result)?; + Some(ProviderUpgradeFinalHeartbeatExpectation { + task_id: task.id.clone(), + provider_id: task.provider_id.clone(), + target_version, + min_heartbeat_at: now, + deadline_at: now + Duration::milliseconds(PROVIDER_UPGRADE_FINAL_HEARTBEAT_TIMEOUT_MS), + scheduler_result: scheduler_result.clone(), + candidate_name: provider_upgrade_candidate_name(scheduler_result), + last_known_good_version: provider_upgrade_last_known_good_version(scheduler_result), + }) +} + +fn provider_upgrade_snapshot_is_candidate( + snapshot: &ProviderUpgradeNodeSnapshot, + candidate_name: Option<&str>, +) -> bool { + let container_name = + provider_upgrade_label_string(&snapshot.labels, "providerGatewayContainerName"); + if let (Some(container_name), Some(candidate_name)) = + (container_name.as_deref(), candidate_name) + { + if container_name == candidate_name { + return true; + } + } + container_name + .as_deref() + .is_some_and(|name| name.contains("-candidate-")) +} + +fn insert_json_field_with_unavailable_reason( + object: &mut serde_json::Map, + key: &str, + value: Option, + unavailable_reason: &str, +) { + if let Some(value) = value { + object.insert(key.to_string(), Value::String(value)); + } else { + object.insert(key.to_string(), Value::Null); + object.insert( + format!("{key}UnavailableReason"), + Value::String(unavailable_reason.to_string()), + ); + } +} + +fn provider_upgrade_final_container_json( + snapshot: Option<&ProviderUpgradeNodeSnapshot>, + unavailable_reason: &str, +) -> Value { + let labels = snapshot.map(|snapshot| &snapshot.labels); + let heartbeat = snapshot + .and_then(provider_upgrade_heartbeat_at) + .map(|value| value.to_rfc3339_opts(chrono::SecondsFormat::Millis, true)); + let mut object = serde_json::Map::new(); + insert_json_field_with_unavailable_reason( + &mut object, + "containerId", + labels + .and_then(|labels| provider_upgrade_label_string(labels, "providerGatewayContainerId")), + unavailable_reason, + ); + insert_json_field_with_unavailable_reason( + &mut object, + "containerName", + labels.and_then(|labels| { + provider_upgrade_label_string(labels, "providerGatewayContainerName") + }), + unavailable_reason, + ); + insert_json_field_with_unavailable_reason( + &mut object, + "version", + labels.and_then(|labels| provider_upgrade_label_string(labels, "providerGatewayVersion")), + unavailable_reason, + ); + insert_json_field_with_unavailable_reason( + &mut object, + "restartPolicy", + labels.and_then(|labels| { + provider_upgrade_label_string(labels, "providerGatewayRestartPolicy") + }), + unavailable_reason, + ); + insert_json_field_with_unavailable_reason( + &mut object, + "pidMode", + labels.and_then(|labels| provider_upgrade_label_string(labels, "providerGatewayPidMode")), + unavailable_reason, + ); + insert_json_field_with_unavailable_reason( + &mut object, + "heartbeatTimestamp", + heartbeat, + unavailable_reason, + ); + Value::Object(object) +} + +fn evaluate_provider_upgrade_final_heartbeat( + expectation: &ProviderUpgradeFinalHeartbeatExpectation, + snapshot: Option<&ProviderUpgradeNodeSnapshot>, + now: DateTime, +) -> ProviderUpgradeFinalHeartbeatDecision { + let candidate_snapshot = snapshot.is_some_and(|snapshot| { + provider_upgrade_snapshot_is_candidate(snapshot, expectation.candidate_name.as_deref()) + }); + let unavailable_reason = if candidate_snapshot { + "latest heartbeat came from candidate validation container, not final promoted compose service" + } else { + "final promoted compose heartbeat not observed" + }; + let final_container_snapshot = if candidate_snapshot { None } else { snapshot }; + if let Some(snapshot) = snapshot { + let observed_version = + provider_upgrade_label_string(&snapshot.labels, "providerGatewayVersion"); + let heartbeat_is_new = provider_upgrade_heartbeat_at(snapshot) + .is_some_and(|last| last >= expectation.min_heartbeat_at); + let same_provider = snapshot.provider_id.as_str() == expectation.provider_id.as_str(); + let online = snapshot.status == "online"; + if same_provider + && online + && observed_version.as_deref() == Some(expectation.target_version.as_str()) + && heartbeat_is_new + && !candidate_snapshot + { + let restart_policy = + provider_upgrade_label_string(&snapshot.labels, "providerGatewayRestartPolicy"); + let pid_mode = + provider_upgrade_label_string(&snapshot.labels, "providerGatewayPidMode"); + if restart_policy.as_deref() != Some("always") || pid_mode.as_deref() != Some("host") { + return ProviderUpgradeFinalHeartbeatDecision::Failed(json!({ + "ok": false, + "reason": "final-promoted-runtime-guard-failed", + "blocker": true, + "providerId": expectation.provider_id.as_str(), + "taskId": expectation.task_id.as_str(), + "targetProviderGatewayVersion": expectation.target_version.as_str(), + "finalPromotedContainer": provider_upgrade_final_container_json(Some(snapshot), "runtime guard heartbeat fields are incomplete"), + "rollback": { + "preferred": "last-known-good", + "required": true, + "lastKnownGoodProviderGatewayVersion": expectation.last_known_good_version.as_deref(), + "updaterRollbackExpected": true + }, + "candidateValidationWasTerminal": false, + "schedulerResult": expectation.scheduler_result.clone() + })); + } + return ProviderUpgradeFinalHeartbeatDecision::Succeeded(json!({ + "ok": true, + "reason": "final-promoted-heartbeat-confirmed", + "providerId": expectation.provider_id.as_str(), + "taskId": expectation.task_id.as_str(), + "targetProviderGatewayVersion": expectation.target_version.as_str(), + "finalPromotedContainer": provider_upgrade_final_container_json(Some(snapshot), "not reported by provider heartbeat"), + "heartbeat": { + "minHeartbeatAt": expectation.min_heartbeat_at.to_rfc3339_opts(chrono::SecondsFormat::Millis, true), + "observedAt": provider_upgrade_heartbeat_at(snapshot).map(|value| value.to_rfc3339_opts(chrono::SecondsFormat::Millis, true)) + }, + "candidateValidationWasTerminal": false, + "schedulerResult": expectation.scheduler_result.clone() + })); + } + } + + let pending_result = json!({ + "ok": false, + "status": "running", + "reason": "awaiting-final-promoted-heartbeat", + "providerId": expectation.provider_id.as_str(), + "taskId": expectation.task_id.as_str(), + "targetProviderGatewayVersion": expectation.target_version.as_str(), + "deadlineAt": expectation.deadline_at.to_rfc3339_opts(chrono::SecondsFormat::Millis, true), + "candidateValidationWasTerminal": false, + "finalPromotedContainer": provider_upgrade_final_container_json(final_container_snapshot, unavailable_reason), + "schedulerResult": expectation.scheduler_result.clone() + }); + if now < expectation.deadline_at { + return ProviderUpgradeFinalHeartbeatDecision::Pending(pending_result); + } + + ProviderUpgradeFinalHeartbeatDecision::Failed(json!({ + "ok": false, + "reason": "final-promoted-heartbeat-timeout", + "error": "final promoted provider-gateway compose container did not report a target-version heartbeat before the guard window expired", + "blocker": true, + "providerId": expectation.provider_id.as_str(), + "taskId": expectation.task_id.as_str(), + "targetProviderGatewayVersion": expectation.target_version.as_str(), + "heartbeat": { + "minHeartbeatAt": expectation.min_heartbeat_at.to_rfc3339_opts(chrono::SecondsFormat::Millis, true), + "deadlineAt": expectation.deadline_at.to_rfc3339_opts(chrono::SecondsFormat::Millis, true), + "observedAt": snapshot.and_then(provider_upgrade_heartbeat_at).map(|value| value.to_rfc3339_opts(chrono::SecondsFormat::Millis, true)) + }, + "finalPromotedContainer": provider_upgrade_final_container_json(final_container_snapshot, unavailable_reason), + "rollback": { + "preferred": "last-known-good", + "required": true, + "lastKnownGoodProviderGatewayVersion": expectation.last_known_good_version.as_deref(), + "updaterRollbackExpected": true + }, + "candidateValidationWasTerminal": false, + "schedulerResult": expectation.scheduler_result.clone() + })) +} + +async fn provider_upgrade_node_snapshot( + state: &Arc, + provider_id: &str, +) -> anyhow::Result> { + if !state.db_ready() { + return Ok(None); + } + let client = state.pool.get().await?; + let row = client + .query_opt( + "SELECT provider_id, status, labels FROM unidesk_nodes WHERE provider_id = $1 LIMIT 1", + &[&provider_id], + ) + .await?; + Ok(row.map(|row| ProviderUpgradeNodeSnapshot { + provider_id: row.get("provider_id"), + status: row.get("status"), + labels: row.get("labels"), + })) +} + +async fn complete_provider_upgrade_task( + state: &Arc, + task_id: &str, + provider_id: &str, + status: &str, + result: Value, +) -> anyhow::Result<()> { + let client = state.pool.get().await?; + let updated = client + .execute( + r#" + UPDATE unidesk_tasks + SET status = $2, result = $3::jsonb, updated_at = now() + WHERE id = $1 + AND command = 'provider.upgrade' + AND status NOT IN ('succeeded', 'failed') + "#, + &[&task_id, &status, &result], + ) + .await?; + record_event( + state, + "provider_upgrade_final_heartbeat_result", + provider_id, + json!({ + "providerId": provider_id, + "taskId": task_id, + "status": status, + "result": result, + "updated": updated, + }), + ) + .await; + if updated > 0 && is_terminal_task_status(status) { + notify_task_terminal(state, task_id).await?; + } + Ok(()) +} + +async fn mark_provider_upgrade_awaiting_final_heartbeat( + state: &Arc, + expectation: &ProviderUpgradeFinalHeartbeatExpectation, + pending_result: Value, +) -> anyhow::Result<()> { + let client = state.pool.get().await?; + client + .execute( + r#" + UPDATE unidesk_tasks + SET status = 'running', result = $2::jsonb, updated_at = now() + WHERE id = $1 + AND command = 'provider.upgrade' + AND status NOT IN ('succeeded', 'failed') + "#, + &[&expectation.task_id, &pending_result], + ) + .await?; + record_event( + state, + "provider_upgrade_final_heartbeat_wait", + &expectation.provider_id, + json!({ + "providerId": expectation.provider_id.as_str(), + "taskId": expectation.task_id.as_str(), + "targetProviderGatewayVersion": expectation.target_version.as_str(), + "deadlineAt": expectation.deadline_at.to_rfc3339_opts(chrono::SecondsFormat::Millis, true), + "candidateValidationWasTerminal": false, + }), + ) + .await; + Ok(()) +} + +async fn monitor_provider_upgrade_final_heartbeat( + state: Arc, + expectation: ProviderUpgradeFinalHeartbeatExpectation, +) -> anyhow::Result<()> { + loop { + let snapshot = provider_upgrade_node_snapshot(&state, &expectation.provider_id).await?; + match evaluate_provider_upgrade_final_heartbeat(&expectation, snapshot.as_ref(), Utc::now()) + { + ProviderUpgradeFinalHeartbeatDecision::Succeeded(result) => { + complete_provider_upgrade_task( + &state, + &expectation.task_id, + &expectation.provider_id, + "succeeded", + result, + ) + .await?; + break; + } + ProviderUpgradeFinalHeartbeatDecision::Failed(result) => { + complete_provider_upgrade_task( + &state, + &expectation.task_id, + &expectation.provider_id, + "failed", + result, + ) + .await?; + break; + } + ProviderUpgradeFinalHeartbeatDecision::Pending(_) => { + tokio::time::sleep(std::time::Duration::from_millis( + PROVIDER_UPGRADE_FINAL_HEARTBEAT_POLL_MS, + )) + .await; + } + } + } + Ok(()) +} + +async fn ensure_provider_upgrade_final_heartbeat_watcher( + state: &Arc, + expectation: ProviderUpgradeFinalHeartbeatExpectation, +) { + let task_id = expectation.task_id.clone(); + let inserted = { + let mut active = state.active_provider_upgrade_watchers.lock().await; + active.insert(task_id.clone()) + }; + if !inserted { + return; + } + let state_for_task = state.clone(); + tokio::spawn(async move { + let provider_id = expectation.provider_id.clone(); + if let Err(error) = + monitor_provider_upgrade_final_heartbeat(state_for_task.clone(), expectation).await + { + state_for_task.log( + "error", + "provider_upgrade_final_heartbeat_watcher_failed", + Some(json!({ "taskId": task_id.as_str(), "providerId": provider_id.as_str(), "error": error.to_string() })), + ); + } + let mut active = state_for_task.active_provider_upgrade_watchers.lock().await; + active.remove(&task_id); + }); +} + +async fn maybe_handle_provider_upgrade_schedule_status( + state: &Arc, + provider_id: &str, + task_id: &str, + status: &str, + stored_result: &Value, +) -> anyhow::Result { + let starts_final_guard = status == "succeeded" + || (status == "running" + && stored_result.get("mode").and_then(Value::as_str) == Some("schedule") + && stored_result + .get("terminalStatusManagedByBackendCore") + .and_then(Value::as_bool) + == Some(true)); + if !starts_final_guard { + return Ok(false); + } + let Some(task) = raw_task(state, task_id).await? else { + return Ok(false); + }; + if !provider_upgrade_schedule_task(&task) { + return Ok(false); + } + let now = Utc::now(); + let Some(expectation) = provider_upgrade_expectation(&task, stored_result, now) else { + let result = json!({ + "ok": false, + "reason": "target-provider-gateway-version-unavailable", + "error": "provider.upgrade mode=schedule cannot be finalized because targetProviderGatewayVersion was not reported", + "blocker": true, + "providerId": provider_id, + "taskId": task_id, + "finalPromotedContainer": provider_upgrade_final_container_json(None, "target gateway version unavailable before final heartbeat guard"), + "candidateValidationWasTerminal": false, + "schedulerResult": stored_result + }); + complete_provider_upgrade_task(state, task_id, provider_id, "failed", result).await?; + return Ok(true); + }; + let pending = match evaluate_provider_upgrade_final_heartbeat(&expectation, None, now) { + ProviderUpgradeFinalHeartbeatDecision::Pending(result) => result, + ProviderUpgradeFinalHeartbeatDecision::Succeeded(result) + | ProviderUpgradeFinalHeartbeatDecision::Failed(result) => result, + }; + mark_provider_upgrade_awaiting_final_heartbeat(state, &expectation, pending).await?; + ensure_provider_upgrade_final_heartbeat_watcher(state, expectation).await; + Ok(true) +} pub async fn provider_ws( state: Arc, @@ -202,7 +710,16 @@ async fn handle_provider_text( let _ = connection.sender.send(Message::Text(json!({ "type": "ack", "requestId": "register", "ok": true, "message": "registered" }).to_string())); } "heartbeat" => { - let labels = message.get("labels").cloned().unwrap_or_else(|| json!({})); + let mut labels = message.get("labels").cloned().unwrap_or_else(|| json!({})); + if let Some(object) = labels.as_object_mut() { + object.insert( + "providerGatewayHeartbeatAt".to_string(), + message + .get("at") + .cloned() + .unwrap_or_else(|| Value::String(Utc::now().to_rfc3339())), + ); + } update_provider_heartbeat(state, &provider_id, &labels).await?; state.log( "debug", @@ -241,6 +758,17 @@ async fn handle_provider_text( compact_json_for_storage(&message.get("result").cloned().unwrap_or_else( || json!({ "message": message.get("message").cloned().unwrap_or(Value::Null) }), )); + if maybe_handle_provider_upgrade_schedule_status( + state, + &provider_id, + task_id, + status, + &stored_result, + ) + .await? + { + return Ok(()); + } let client = state.pool.get().await?; client.execute( r#" @@ -339,3 +867,122 @@ pub async fn mark_stale_providers_offline(state: &Arc) -> anyhow::Resu } Ok(()) } + +#[cfg(test)] +mod tests { + use super::*; + + fn dt(value: &str) -> DateTime { + DateTime::parse_from_rfc3339(value) + .expect("valid fixture timestamp") + .with_timezone(&Utc) + } + + fn expectation() -> ProviderUpgradeFinalHeartbeatExpectation { + ProviderUpgradeFinalHeartbeatExpectation { + task_id: "task_upgrade_contract".to_string(), + provider_id: "D601".to_string(), + target_version: "0.2.27".to_string(), + min_heartbeat_at: dt("2026-05-24T10:00:00Z"), + deadline_at: dt("2026-05-24T10:04:00Z"), + scheduler_result: json!({ + "mode": "schedule", + "providerGatewayVersion": "0.2.26", + "targetProviderGatewayVersion": "0.2.27", + "plan": { + "candidateName": "unidesk-provider-gateway-D601-candidate-task-upgrade-contract" + } + }), + candidate_name: Some( + "unidesk-provider-gateway-D601-candidate-task-upgrade-contract".to_string(), + ), + last_known_good_version: Some("0.2.26".to_string()), + } + } + + #[test] + fn candidate_validation_heartbeat_is_not_terminal_success() { + let snapshot = ProviderUpgradeNodeSnapshot { + provider_id: "D601".to_string(), + status: "online".to_string(), + labels: json!({ + "providerGatewayVersion": "0.2.27", + "providerGatewayContainerId": "candidate-container-id", + "providerGatewayContainerName": "unidesk-provider-gateway-D601-candidate-task-upgrade-contract", + "providerGatewayRestartPolicy": "no", + "providerGatewayPidMode": "host", + "providerGatewayHeartbeatAt": "2026-05-24T10:01:00Z" + }), + }; + let decision = evaluate_provider_upgrade_final_heartbeat( + &expectation(), + Some(&snapshot), + dt("2026-05-24T10:05:00Z"), + ); + let ProviderUpgradeFinalHeartbeatDecision::Failed(result) = decision else { + panic!("candidate heartbeat must fail after final promoted heartbeat timeout"); + }; + assert_eq!( + result.get("reason").and_then(Value::as_str), + Some("final-promoted-heartbeat-timeout") + ); + assert_eq!( + result + .get("candidateValidationWasTerminal") + .and_then(Value::as_bool), + Some(false) + ); + assert_eq!( + result + .pointer("/rollback/preferred") + .and_then(Value::as_str), + Some("last-known-good") + ); + assert_eq!( + result + .pointer("/finalPromotedContainer/containerIdUnavailableReason") + .and_then(Value::as_str), + Some("latest heartbeat came from candidate validation container, not final promoted compose service") + ); + } + + #[test] + fn final_promoted_compose_heartbeat_is_success() { + let snapshot = ProviderUpgradeNodeSnapshot { + provider_id: "D601".to_string(), + status: "online".to_string(), + labels: json!({ + "providerGatewayVersion": "0.2.27", + "providerGatewayContainerId": "final-container-id", + "providerGatewayContainerName": "unidesk-provider-gateway-D601", + "providerGatewayRestartPolicy": "always", + "providerGatewayPidMode": "host", + "providerGatewayHeartbeatAt": "2026-05-24T10:02:00Z" + }), + }; + let decision = evaluate_provider_upgrade_final_heartbeat( + &expectation(), + Some(&snapshot), + dt("2026-05-24T10:02:01Z"), + ); + let ProviderUpgradeFinalHeartbeatDecision::Succeeded(result) = decision else { + panic!("final compose heartbeat should succeed"); + }; + assert_eq!( + result.get("reason").and_then(Value::as_str), + Some("final-promoted-heartbeat-confirmed") + ); + assert_eq!( + result + .pointer("/finalPromotedContainer/containerId") + .and_then(Value::as_str), + Some("final-container-id") + ); + assert_eq!( + result + .pointer("/finalPromotedContainer/heartbeatTimestamp") + .and_then(Value::as_str), + Some("2026-05-24T10:02:00.000Z") + ); + } +} diff --git a/src/components/backend-core/src/state.rs b/src/components/backend-core/src/state.rs index 393fe8a7..8a70a5f0 100644 --- a/src/components/backend-core/src/state.rs +++ b/src/components/backend-core/src/state.rs @@ -1,4 +1,4 @@ -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::str::FromStr; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; @@ -51,6 +51,7 @@ pub struct AppState { pub request_performance_samples: Mutex>, pub operation_performance_samples: Mutex>, pub active_scheduled_runs: Mutex>, + pub active_provider_upgrade_watchers: Mutex>, } impl AppState { @@ -84,6 +85,7 @@ impl AppState { request_performance_samples: Mutex::new(Vec::new()), operation_performance_samples: Mutex::new(Vec::new()), active_scheduled_runs: Mutex::new(std::collections::HashSet::new()), + active_provider_upgrade_watchers: Mutex::new(HashSet::new()), }) } diff --git a/src/components/microservices/code-queue/Dockerfile b/src/components/microservices/code-queue/Dockerfile index 899307dd..c035409b 100644 --- a/src/components/microservices/code-queue/Dockerfile +++ b/src/components/microservices/code-queue/Dockerfile @@ -3,8 +3,11 @@ FROM ${CODE_QUEUE_BASE_IMAGE} ENV PLAYWRIGHT_BROWSERS_PATH=/ms-playwright ENV UNIDESK_SKILLS_PATH=/root/.agents/skills +ENV RUSTUP_HOME=/usr/local/rustup +ENV CARGO_HOME=/usr/local/cargo +ENV PATH=/usr/local/cargo/bin:${PATH} -RUN (command -v codex >/dev/null 2>&1 && command -v opencode >/dev/null 2>&1 && command -v docker >/dev/null 2>&1 && command -v gh >/dev/null 2>&1 && command -v rg >/dev/null 2>&1 && command -v cargo >/dev/null 2>&1 && command -v rustc >/dev/null 2>&1 && command -v rustfmt >/dev/null 2>&1 && command -v xvfb-run >/dev/null 2>&1 && command -v xauth >/dev/null 2>&1 && test -x "$PLAYWRIGHT_BROWSERS_PATH/chromium_headless_shell-1217/chrome-headless-shell-linux64/chrome-headless-shell") \ +RUN (command -v codex >/dev/null 2>&1 && command -v opencode >/dev/null 2>&1 && command -v docker >/dev/null 2>&1 && command -v gh >/dev/null 2>&1 && command -v rg >/dev/null 2>&1 && command -v cargo >/dev/null 2>&1 && command -v rustc >/dev/null 2>&1 && rustc --version | awk '{ split($2, v, "."); exit ! (v[1] > 1 || (v[1] == 1 && v[2] >= 86)) }' && command -v rustfmt >/dev/null 2>&1 && command -v xvfb-run >/dev/null 2>&1 && command -v xauth >/dev/null 2>&1 && test -x "$PLAYWRIGHT_BROWSERS_PATH/chromium_headless_shell-1217/chrome-headless-shell-linux64/chrome-headless-shell") \ || (apt-get update \ && apt-get install -y --no-install-recommends \ bash \ @@ -42,6 +45,8 @@ RUN (command -v codex >/dev/null 2>&1 && command -v opencode >/dev/null 2>&1 && xauth \ xvfb \ xz-utils \ + && curl https://sh.rustup.rs -sSf | sh -s -- -y --profile minimal --default-toolchain stable \ + && rustup component add rustfmt \ && mkdir -p /usr/local/lib/docker/cli-plugins /root/.docker/cli-plugins \ && ln -sf /usr/bin/docker-compose /usr/local/lib/docker/cli-plugins/docker-compose \ && ln -sf /usr/bin/docker-compose /root/.docker/cli-plugins/docker-compose \ diff --git a/src/components/provider-gateway/package.json b/src/components/provider-gateway/package.json index 71c5f294..50f945cd 100644 --- a/src/components/provider-gateway/package.json +++ b/src/components/provider-gateway/package.json @@ -1,6 +1,6 @@ { "name": "@unidesk/provider-gateway", - "version": "0.2.26", + "version": "0.2.27", "private": true, "type": "module", "scripts": { diff --git a/src/components/provider-gateway/src/index.ts b/src/components/provider-gateway/src/index.ts index b1be2f68..c46cd4de 100644 --- a/src/components/provider-gateway/src/index.ts +++ b/src/components/provider-gateway/src/index.ts @@ -1671,7 +1671,24 @@ function upgradePlan(taskId: string): Record { const validationNeedleAck = `"requestId":"register"`; const validationNeedleOk = `"ok":true`; const validationAttempts = Math.max(1, Math.ceil(validationTimeoutMs / 2000)); + const finalPromotedReconnectTimeoutMs = 180_000; + const finalPromotedHeartbeatTimeoutMs = 240_000; + const finalValidationAttempts = Math.max(1, Math.ceil(finalPromotedReconnectTimeoutMs / 2000)); const targetGatewayMetadata = readTargetGatewayMetadata(workspace); + const unavailableFinalPromotedContainer = { + containerId: null, + containerIdUnavailableReason: "mode=plan or schedule has not yet observed backend-core final promoted heartbeat", + containerName: null, + containerNameUnavailableReason: "mode=plan or schedule has not yet observed backend-core final promoted heartbeat", + version: null, + versionUnavailableReason: "mode=plan or schedule has not yet observed backend-core final promoted heartbeat", + restartPolicy: null, + restartPolicyUnavailableReason: "mode=plan or schedule has not yet observed backend-core final promoted heartbeat", + pidMode: null, + pidModeUnavailableReason: "mode=plan or schedule has not yet observed backend-core final promoted heartbeat", + heartbeatTimestamp: null, + heartbeatTimestampUnavailableReason: "mode=plan or schedule has not yet observed backend-core final promoted heartbeat", + }; const compactBuildContextRefreshScript = [ `provider_src=${shellQuote(`${workspaceRoot}/src/components/provider-gateway`)}`, `shared_src=${shellQuote(`${workspaceRoot}/src/components/shared`)}`, @@ -1693,6 +1710,33 @@ function upgradePlan(taskId: string): Record { ` fi`, `done`, ].join("\n"); + const rollbackToLastKnownGoodScript = [ + `rollback_to_last_known_good() {`, + ` reason="$1"`, + ` echo "rollback_to_last_known_good: $reason" >&2`, + ` if [ -n "$final_container" ]; then docker rm -f "$final_container" >/dev/null 2>&1 || true; fi`, + ` docker rm -f ${shellQuote(candidateName)} >/dev/null 2>&1 || true`, + ` if [ -z "$old_name" ] || [ -z "$old_image" ]; then echo "rollback_to_last_known_good unavailable: old_name=$old_name old_image=$old_image" >&2; return 1; fi`, + ` docker rm -f "$old_name" >/dev/null 2>&1 || true`, + [ + ` docker run -d`, + `--name "$old_name"`, + `--restart always`, + `--pid "\${old_pid_mode:-host}"`, + `$network_arg`, + `--env-file "$candidate_env_file"`, + `--label ${shellQuote(`com.docker.compose.project=${config.upgradeComposeProject}`)}`, + `--label ${shellQuote(`com.docker.compose.service=${config.upgradeService}`)}`, + `--label ${shellQuote("com.docker.compose.oneoff=False")}`, + `--label ${shellQuote(`unidesk.upgrade.rollback=${taskId}`)}`, + `$mount_args`, + `$extra_host_args`, + `"$old_image" >/dev/null || return 1`, + ].join(" "), + ` if [ -n "$network_names" ]; then for net in $network_names; do if [ "$net" != "$first_network" ]; then docker network connect "$net" "$old_name" >/dev/null 2>&1 || true; fi; done; fi`, + ` echo "rollback_to_last_known_good restored old gateway container name=$old_name image=$old_image reason=$reason" >&2`, + `}`, + ].join("\n"); const script = [ "set -eu", `cd ${shellQuote(workspace)}`, @@ -1701,16 +1745,24 @@ function upgradePlan(taskId: string): Record { `first_old=""`, `for old_id in $old_ids; do first_old="$old_id"; break; done`, `old_name=""`, + `old_image=""`, + `old_restart=""`, + `old_pid_mode=""`, + `final_container=""`, `mount_args=""`, `extra_host_args=""`, `candidate_env_file=${shellQuote(`/tmp/${candidateName}.env`)}`, `network_names=""`, `first_network=""`, `network_arg=""`, + rollbackToLastKnownGoodScript, `if [ -n "$old_ids" ]; then docker update --restart always $old_ids >/dev/null 2>&1 || true; fi`, `if [ -z "$first_old" ]; then echo "no existing provider-gateway compose container found; cannot perform safe in-place upgrade" >&2; exit 1; fi`, `old_name=$(docker inspect --format '{{.Name}}' "$first_old")`, `old_name="\${old_name#/}"`, + `old_image=$(docker inspect --format '{{.Image}}' "$first_old")`, + `old_restart=$(docker inspect --format '{{.HostConfig.RestartPolicy.Name}}' "$first_old")`, + `old_pid_mode=$(docker inspect --format '{{.HostConfig.PidMode}}' "$first_old")`, `docker inspect --format '{{range .Config.Env}}{{println .}}{{end}}' "$first_old" > "$candidate_env_file"`, `mount_args=$(docker inspect --format '{{range .Mounts}}{{printf "-v %s:%s" .Source .Destination}}{{if not .RW}}{{printf ":ro"}}{{end}}{{printf "\\n"}}{{end}}' "$first_old")`, `extra_host_args=$(docker inspect --format '{{range .HostConfig.ExtraHosts}}{{printf "--add-host %s\\n" .}}{{end}}' "$first_old")`, @@ -1745,12 +1797,18 @@ function upgradePlan(taskId: string): Record { `docker rm -f ${shellQuote(candidateName)} >/dev/null 2>&1 || true`, composeUpCommand.map(shellQuote).join(" "), `final_container="$old_name"`, - `if [ -z "$final_container" ]; then final_container=$(${listServiceContainersCommand.map(shellQuote).join(" ")} | head -n 1); fi`, + `if [ -z "$final_container" ] || ! docker inspect "$final_container" >/dev/null 2>&1; then final_container=$(${listServiceContainersCommand.map(shellQuote).join(" ")} | head -n 1); fi`, + `if [ -z "$final_container" ]; then echo "final provider-gateway compose container not found after promotion" >&2; rollback_to_last_known_good "final-compose-container-missing" || true; rm -f "$candidate_env_file"; exit 1; fi`, + `final_container_id=$(docker inspect --format '{{.Id}}' "$final_container")`, `final_restart=$(docker inspect --format '{{.HostConfig.RestartPolicy.Name}}' "$final_container")`, `final_pid_mode=$(docker inspect --format '{{.HostConfig.PidMode}}' "$final_container")`, - `if [ "$final_restart" != "always" ] || [ "$final_pid_mode" != "host" ]; then echo "final provider-gateway runtime guard failed: restart=$final_restart pid=$final_pid_mode" >&2; exit 1; fi`, + `if [ "$final_restart" != "always" ] || [ "$final_pid_mode" != "host" ]; then echo "final provider-gateway runtime guard failed: restart=$final_restart pid=$final_pid_mode" >&2; rollback_to_last_known_good "final-runtime-guard-failed" || true; rm -f "$candidate_env_file"; exit 1; fi`, + `final_attempt=0`, + `final_validated=0`, + `while [ "$final_attempt" -lt "${finalValidationAttempts}" ]; do final_logs=$(docker logs "$final_container" 2>&1 || true); final_has_open=0; final_has_ack=0; final_has_ok=0; case "$final_logs" in *${shellQuote(validationNeedleOpen)}*) final_has_open=1;; esac; case "$final_logs" in *${shellQuote(validationNeedleAck)}*) final_has_ack=1;; esac; case "$final_logs" in *${shellQuote(validationNeedleOk)}*) final_has_ok=1;; esac; if [ "$final_has_open" = "1" ] && [ "$final_has_ack" = "1" ] && [ "$final_has_ok" = "1" ]; then final_validated=1; break; fi; final_running=$(docker inspect --format '{{.State.Running}}' "$final_container" 2>/dev/null || true); if [ "$final_running" != "true" ]; then break; fi; final_attempt=$((final_attempt + 1)); sleep 2; done`, + `if [ "$final_validated" != "1" ]; then echo "final provider-gateway compose container did not reconnect to backend-core before local guard timeout; backend-core must not treat candidate validation as success" >&2; docker logs "$final_container" >&2 || true; rollback_to_last_known_good "final-promoted-container-reconnect-timeout" || true; rm -f "$candidate_env_file"; exit 1; fi`, `rm -f "$candidate_env_file"`, - `echo "candidate provider-gateway validated and promoted"`, + `echo "candidate provider-gateway validated; final promoted compose container reconnected locally: container=$final_container id=$final_container_id restart=$final_restart pid=$final_pid_mode"`, ].join("\n"); const dockerRunCommand = [ "docker", @@ -1805,16 +1863,23 @@ function upgradePlan(taskId: string): Record { buildBeforeCandidate: true, oldGatewaySleepMs: sleepMs, validationTimeoutMs, + finalPromotedReconnectTimeoutMs, + finalPromotedHeartbeatTimeoutMs, oldGatewayRestartPolicyBeforeSleep: "always", promoteOnlyAfterCandidateValidation: true, - candidateRestartPolicyAfterPromotion: "always", - candidateFinalRestartPolicyValidation: true, - candidateUsesOldContainerMounts: true, - candidateUsesOldContainerNetworks: true, - candidateUsesOldContainerExtraHosts: true, - candidateUsesOldContainerEnvironment: true, - candidateUsesHostPidNamespace: true, - startupSelfHealsRestartPolicy: true, + candidateValidationIsNotTerminalSuccess: true, + finalPromotedComposeHeartbeatRequired: true, + finalPromotedContainerMustNotBeCandidate: true, + localRollbackToLastKnownGoodOnFinalReconnectFailure: true, + backendCoreOwnsFinalTerminalStatus: true, + candidateRestartPolicyAfterPromotion: "always", + candidateFinalRestartPolicyValidation: true, + candidateUsesOldContainerMounts: true, + candidateUsesOldContainerNetworks: true, + candidateUsesOldContainerExtraHosts: true, + candidateUsesOldContainerEnvironment: true, + candidateUsesHostPidNamespace: true, + startupSelfHealsRestartPolicy: true, dockerStatusReportsRestartPolicyAndPidMode: true, refreshesCompactBuildContextBeforeBuild: true, removeScope: { @@ -1825,6 +1890,17 @@ function upgradePlan(taskId: string): Record { namedVolumesPreserved: true, }, dockerRunCommand, + finalPromotedHeartbeatConfirmation: { + required: true, + providerId: config.providerId, + targetProviderGatewayVersion: targetGatewayMetadata.version, + backendCoreNodeRegistry: "/api/nodes", + heartbeatTimeoutMs: finalPromotedHeartbeatTimeoutMs, + finalContainerMustNotEqualCandidate: candidateName, + requiredResultFields: ["containerId", "version", "restartPolicy", "pidMode", "heartbeatTimestamp"], + unavailableResult: unavailableFinalPromotedContainer, + }, + finalPromotedContainer: unavailableFinalPromotedContainer, compactBuildContextCandidates, workspaceStateMount: `${config.upgradeHostProjectRoot.replace(/\/+$/, "")}/.state:${workspaceRoot}/.state`, }; @@ -1843,12 +1919,16 @@ async function runProviderUpgrade(taskId: string, payload: Record enterUpgradeSleep(sleepMs), 250).unref?.(); return { mode, - message: "provider gateway upgrade scheduled with sleep-and-validate rollback guard", + message: "provider gateway upgrade scheduled; backend-core must wait for final promoted compose heartbeat before terminal success", providerId: config.providerId, providerName: config.providerName, providerGatewayVersion: gatewayMetadata.version, targetProviderGatewayVersion: (plan.targetProviderGatewayVersion as string | undefined) ?? gatewayMetadata.version, updaterContainerId: result.stdout.trim(), + terminalStatusManagedByBackendCore: true, + candidateValidationIsNotTerminalSuccess: true, + finalPromotedHeartbeatConfirmation: plan.finalPromotedHeartbeatConfirmation, + finalPromotedContainer: plan.finalPromotedContainer, plan, stderr: result.stderr.slice(0, 500), }; @@ -2145,7 +2225,15 @@ async function handleDispatch(message: CoreDispatchMessage): Promise { } if (message.command === "provider.upgrade") { const result = await runProviderUpgrade(message.taskId, message.payload); - await sendTaskStatus(message.taskId, "succeeded", "provider upgrade command completed", result); + const scheduleMode = message.payload.mode === "schedule"; + await sendTaskStatus( + message.taskId, + scheduleMode ? "running" : "succeeded", + scheduleMode + ? "provider upgrade scheduled; awaiting backend-core final promoted heartbeat" + : "provider upgrade command completed", + result, + ); return; } if (message.command === "host.ssh") {