feat: close AgentRun commander task plane gaps

This commit is contained in:
Codex
2026-06-09 00:06:53 +08:00
parent 96c8283574
commit 2f5cf8b3d4
15 changed files with 411 additions and 23 deletions
+1 -1
View File
@@ -37,7 +37,7 @@ Secret 名称和 key 可以在实现时按 Kubernetes 命名限制微调,但
- `agentrun_runner_jobs`:手动 runner Job 的 idempotency key、payload hash、attempt/job identity 和创建响应。
- `agentrun_sessions`SessionRef 到 backend thread/cache identity、execution projection、active run/command、terminal/unread 水位的映射,不保存 credential 文件或 Secret 值。
- `agentrun_session_read_cursors`:按 reader 记录 Session 已读 version,用于 CLI 默认只看 running/unread。
- `agentrun_queue_tasks`AgentRun Queue task identity、queue/lane、tenant/project、priority、state、backendProfile、workspace/resource 引用和 version
- `agentrun_queue_tasks`AgentRun Queue task identity、queue/lane、tenant/project、priority、state、backendProfile、workspace/resource 引用、可选 `sessionRef` 和 version;携带 `sessionRef` 的 task dispatch 后必须把 `sessionPath` 写回 Queue 记录
- `agentrun_queue_attempts`task attempt identity、runId、commandId、runnerJobId、sessionId、state、failureKind、retry index 和 timestamps。
- `agentrun_task_summaries` / `agentrun_attempt_summaries`Queue 列表、详情和 commander 使用的轻量摘要;不能从 Core trace 反推 overview。
- `agentrun_queue_stats`:按 queue/lane/state/backendProfile 聚合的统计水位。
+2 -2
View File
@@ -107,7 +107,7 @@ Session 命令负责输出、trace 和会话控制:
Queue 首版新增或扩展的稳定表方向:
- `agentrun_queue_tasks`:任务 identity、tenant/project、queue/lane、title、priority、state、backendProfile、workspace/resource 引用、创建者、version 和 timestamps。
- `agentrun_queue_tasks`:任务 identity、tenant/project、queue/lane、title、priority、state、backendProfile、workspace/resource 引用、可选 `sessionRef`创建者、version 和 timestamps。
- `agentrun_queue_attempts`attempt identity、taskId、runId、commandId、runnerJobId、sessionId、state、failureKind、retry index 和 timestamps。
- `agentrun_task_summaries`:task 级摘要、当前状态、最新 attempt、最新 sessionPath、最后用户可见摘要和统计字段。
- `agentrun_attempt_summaries`attempt 级摘要、terminalStatus、failureKind、runner identity、耗时和有界输出摘要引用。
@@ -122,7 +122,7 @@ Queue 首版新增或扩展的稳定表方向:
| 旧 Code Queue 能力 | AgentRun 目标 | 吸收方式 | 不保留内容 |
| --- | --- | --- | --- |
| task 记录 | `agentrun_queue_tasks` | 直接建模为 Queue task | 旧 API 字段兼容 |
| task 记录 | `agentrun_queue_tasks` | 直接建模为 Queue task;需要 trace/reuse 时必须携带 `sessionRef` 并在 dispatch 后暴露 `sessionPath` | 旧 API 字段兼容 |
| queueId、move、merge | queue/lane 字段和 Queue API | 保留队列归类与移动语义 | 旧 UI 操作合同 |
| attempt | `agentrun_queue_attempts` + Core run/command/runner job | attempt 引用真实执行资源 | 旧 attempt 输出结构 |
| `processQueue` / `runTask` | AgentRun Scheduler | 由 Scheduler 扫描 pending task/attempt 并创建 runner job | UniDesk Code Queue scheduler |
@@ -168,6 +168,8 @@ HWLAB Workbench 的 project/workspace 不属于 RuntimeAssembly 四要素,也
runner 对 workspace `tools/` 做统一装配:顶层带 shebang 的脚本会被 `chmod +x``tools/` 目录会追加到 `PATH`。非 shebang 文件是随 bundle 复制的源码、测试或辅助文件,不作为可执行工具发现,也不触发 schema-invalid。短命令名称来自 repo 内真实文件,例如 `tools/hwpod`,不再由 runner 生成 wrapper。
AgentRun 自身仓库必须提供 `tools/tran``tools/trans`,用于承接 UniDesk frontend `/ws/ssh` 的 scoped client-token 透传。runner 只通过 `executionPolicy.secretScope.toolCredentials[]` 投影 `UNIDESK_SSH_CLIENT_TOKEN`,并通过 `transientEnv` 注入非敏感 `UNIDESK_MAIN_SERVER_IP` / `UNIDESK_FRONTEND_URL`;工具不得读取 provider token、主 server SSH key 或完整 frontend 登录态。`tran --help` 必须输出 JSON,并列出当前支持的最小开发面:host/host workspace `script``argv`、普通 ssh-like 命令、`k3s kubectl``k3s script` 和 k3s workload `argv/script`。未实现的 `apply-patch``upload``download` 和 Windows route 必须显式 `unsupported-operation`,不能静默改走不受控 shell 拼接或 token fallback。
#### promptRefs
`promptRefs` 用于按同一个 materialized gitbundle checkout 的 `path` 装配初始 prompt。它承载业务域稳定 runtime/developer instruction,例如某个项目的标准入口、禁止路径和工具使用纪律;它不承载用户本轮 message,也不承载历史会话。
+51 -7
View File
@@ -1,6 +1,6 @@
import { mkdir, readFile, rm, writeFile } from "node:fs/promises";
import { randomUUID } from "node:crypto";
import { spawn } from "node:child_process";
import { execFileSync, spawn } from "node:child_process";
import { closeSync, existsSync, openSync } from "node:fs";
import path from "node:path";
import { startManagerServer } from "../../src/mgr/server.js";
@@ -33,7 +33,10 @@ export async function runCli(argv: string[]): Promise<void> {
async function dispatch(args: ParsedArgs): Promise<JsonValue> {
const [group, command, id] = args.positional;
if (!group || group === "help") return help();
if (!group || group === "help" || group === "--help") return help(args);
if (args.flags.get("help") === true) return help(args, group);
if (command === "help" || command === "--help") return help(args, group);
if (group === "manager" && (command === "url" || command === "resolve-url" || command === "status")) return managerEndpoint(args);
if (group === "server" && command === "start") return startServer(args);
if (group === "server" && command === "status") return serverStatus(args);
if (group === "server" && command === "logs") return serverLogs(args);
@@ -556,7 +559,40 @@ function client(args: ParsedArgs): ManagerClient {
}
function managerUrl(args: ParsedArgs): string {
return optionalFlag(args, "manager-url") ?? process.env.AGENTRUN_MGR_URL ?? "http://127.0.0.1:8080";
const explicit = optionalFlag(args, "manager-url") ?? process.env.AGENTRUN_MGR_URL;
if (explicit && explicit !== "auto") return explicit;
if (explicit === "auto") return resolveRuntimeManagerUrl();
return "http://127.0.0.1:8080";
}
function managerEndpoint(args: ParsedArgs): JsonRecord {
const url = managerUrl(args);
const explicit = optionalFlag(args, "manager-url") ?? process.env.AGENTRUN_MGR_URL ?? null;
return {
action: "manager-endpoint",
managerUrl: url,
source: explicit === "auto" ? "auto" : explicit ? (optionalFlag(args, "manager-url") ? "--manager-url" : "AGENTRUN_MGR_URL") : "default-localhost",
runtimeNamespace: "agentrun-v01",
serviceName: "agentrun-mgr",
valuesPrinted: false,
examples: {
commander: "./scripts/agentrun --manager-url auto queue commander",
sessions: "./scripts/agentrun --manager-url auto sessions ps --state default --reader-id cli",
explicit: "./scripts/agentrun --manager-url http://<reachable-host>:8080 queue commander",
},
};
}
function resolveRuntimeManagerUrl(): string {
const fromEnv = process.env.AGENTRUN_RUNTIME_MANAGER_URL;
if (fromEnv) return fromEnv;
try {
const serviceIp = execFileSync("kubectl", ["-n", "agentrun-v01", "get", "svc", "agentrun-mgr", "-o", "jsonpath={.spec.clusterIP}"], { encoding: "utf8", stdio: ["ignore", "pipe", "ignore"] }).trim();
if (serviceIp.length > 0 && serviceIp !== "None") return `http://${serviceIp}:8080`;
} catch {
// Fall through to localhost so the following request produces a structured connection failure.
}
return "http://127.0.0.1:8080";
}
function portFromManagerUrl(args: ParsedArgs): string {
@@ -758,9 +794,8 @@ function cancelBody(args: ParsedArgs): JsonRecord {
return reason ? { reason } : {};
}
function help(): JsonRecord {
return {
commands: [
function help(args: ParsedArgs, group?: string): JsonRecord {
const commands = [
"runs create --json-file <run.json>",
"runs show <runId>",
"runs events <runId> --after-seq <n> --limit <n>",
@@ -808,7 +843,16 @@ function help(): JsonRecord {
"server status [--port <port>]",
"server logs [--port <port>] [--tail-bytes <bytes>] [--log-file <path>]",
"server stop [--port <port>]",
],
];
if (group) {
const prefix = `${group} `;
const groupCommands = commands.filter((item) => item === group || item.startsWith(prefix));
return { group, commands: groupCommands, commandCount: groupCommands.length, supported: groupCommands.length > 0, manager: managerEndpoint(args) };
}
return {
groups: [...new Set(commands.map((item) => item.split(" ")[0]).filter(Boolean))].sort(),
commands,
manager: managerEndpoint(args),
};
}
+1
View File
@@ -309,6 +309,7 @@ export interface CreateQueueTaskInput extends JsonRecord {
backendProfile: BackendProfile;
providerId: string | null;
workspaceRef: WorkspaceRef | null;
sessionRef: SessionRef | null;
executionPolicy: ExecutionPolicy | null;
resourceBundleRef: ResourceBundleRef | null;
payload: JsonRecord;
+1
View File
@@ -318,6 +318,7 @@ export function validateCreateQueueTask(input: unknown): CreateQueueTaskInput {
backendProfile: backendProfileValue,
providerId: optionalString(record.providerId) ?? null,
workspaceRef: record.workspaceRef === undefined || record.workspaceRef === null ? null : requiredRecord(record, "workspaceRef") as CreateQueueTaskInput["workspaceRef"],
sessionRef: validateSessionRef(record.sessionRef),
executionPolicy: record.executionPolicy === undefined || record.executionPolicy === null ? null : validateExecutionPolicy(requiredRecord(record, "executionPolicy")),
resourceBundleRef: validateResourceBundleRef(record.resourceBundleRef),
payload: record.payload === undefined ? {} : asRecord(record.payload, "payload"),
+19 -6
View File
@@ -6,7 +6,7 @@ import { redactJson } from "../common/redaction.js";
import type { BackendProfile, BackendTurnResult, CommandRecord, CommandState, CreateCommandInput, CreateQueueTaskInput, CreateRunInput, EventType, FailureKind, JsonRecord, JsonValue, ListGcExpiredSessionsInput, QueueCommanderSnapshot, QueueReadCursorRecord, QueueStats, QueueTaskListResult, QueueTaskRecord, QueueTaskState, RunEvent, RunnerJobRecord, RunnerRecord, RunRecord, RunStatus, SessionEventPage, SessionListResult, SessionReadCursorRecord, SessionRecord, SessionRef, SessionStoragePatch, SessionSummary, TerminalStatus, UpsertSessionInput } from "../common/types.js";
import { newId, nowIso, stableHash } from "../common/validation.js";
import type { AgentRunStore, ListQueueTasksInput, ListSessionsInput, SaveRunnerJobInput, SessionEventPageInput, StoreHealth, UpdateQueueTaskAttemptInput } from "./store.js";
import { assertSessionBoundary, buildQueueStats, buildSessionSummary, clampQueueLimit, clampSessionLimit, commandStateFromTerminal, isLeaseExpired, isSessionOutputEvent, isTerminalCommandState, isTerminalQueueTaskState, isTerminalRunStatus, parseQueueCursor, parseSessionCursor, queueTaskSort, sessionListFilters, sessionMatchesListState, sessionRefFromRecord, sessionSort, sessionTitleFromCommand, statusFromTerminal, summarizeResourceBundleRef, summarizeSessionRef, titleFromMetadata } from "./store.js";
import { assertSessionBoundary, buildQueueStats, buildSessionSummary, clampQueueLimit, clampSessionLimit, commandStateFromTerminal, isLeaseExpired, isSessionOutputEvent, isTerminalCommandState, isTerminalQueueTaskState, isTerminalRunStatus, parseQueueCursor, parseSessionCursor, queueTaskPayloadHash, queueTaskSort, sessionListFilters, sessionMatchesListState, sessionRefFromRecord, sessionSort, sessionTitleFromCommand, statusFromTerminal, summarizeResourceBundleRef, summarizeSessionRef, titleFromMetadata } from "./store.js";
import { backendCapabilitiesSqlValues, mergeBackendCapability } from "../common/backend-profiles.js";
import { normalizeRunEventPayload, requireEventType } from "../common/events.js";
@@ -329,6 +329,11 @@ CREATE TABLE IF NOT EXISTS agentrun_queue_read_cursors (
CREATE SEQUENCE IF NOT EXISTS agentrun_queue_version_seq;
`;
const queueSessionRefMigrationSql = `
ALTER TABLE agentrun_queue_tasks
ADD COLUMN IF NOT EXISTS session_ref jsonb;
`;
const postgresMigrations: MigrationDefinition[] = [
{
id: "001_v01_initial_durable_store",
@@ -375,6 +380,11 @@ const postgresMigrations: MigrationDefinition[] = [
checksum: checksumSql(dsflashGoModelCatalogBackendProfileMigrationSql),
sql: dsflashGoModelCatalogBackendProfileMigrationSql,
},
{
id: "010_v01_queue_session_ref",
checksum: checksumSql(queueSessionRefMigrationSql),
sql: queueSessionRefMigrationSql,
},
];
export function postgresMigrationContract(): JsonRecord {
@@ -905,7 +915,7 @@ CREATE TABLE IF NOT EXISTS agentrun_schema_migrations (
}
async createQueueTask(input: CreateQueueTaskInput): Promise<QueueTaskRecord> {
const payloadHash = stableHash(input.payload);
const payloadHash = queueTaskPayloadHash(input);
return this.withTransaction(async (client) => {
if (input.idempotencyKey) {
const existing = await client.query("SELECT * FROM agentrun_queue_tasks WHERE tenant_id = $1 AND project_id = $2 AND idempotency_key = $3 FOR UPDATE", [input.tenantId, input.projectId, input.idempotencyKey]);
@@ -917,12 +927,14 @@ CREATE TABLE IF NOT EXISTS agentrun_schema_migrations (
}
const at = nowIso();
const version = await this.nextQueueVersion(client);
const task: QueueTaskRecord = { ...input, id: newId("qt"), state: "pending", version, payloadHash, latestAttempt: null, sessionPath: null, createdAt: at, updatedAt: at, cancelledAt: null, cancelReason: null };
const sessionId = input.sessionRef?.sessionId ?? null;
const sessionPath = sessionId ? `/api/v1/sessions/${encodeURIComponent(sessionId)}` : null;
const task: QueueTaskRecord = { ...input, id: newId("qt"), state: "pending", version, payloadHash, latestAttempt: null, sessionPath, createdAt: at, updatedAt: at, cancelledAt: null, cancelReason: null };
const inserted = await client.query(
`INSERT INTO agentrun_queue_tasks (id, tenant_id, project_id, queue, lane, title, priority, state, version, backend_profile, provider_id, workspace_ref, execution_policy, resource_bundle_ref, payload, payload_hash, references_json, metadata, idempotency_key, latest_attempt, session_path, created_at, updated_at, cancelled_at, cancel_reason)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12::jsonb, $13::jsonb, $14::jsonb, $15::jsonb, $16, $17::jsonb, $18::jsonb, $19, $20::jsonb, $21, $22, $23, $24, $25)
`INSERT INTO agentrun_queue_tasks (id, tenant_id, project_id, queue, lane, title, priority, state, version, backend_profile, provider_id, workspace_ref, session_ref, execution_policy, resource_bundle_ref, payload, payload_hash, references_json, metadata, idempotency_key, latest_attempt, session_path, created_at, updated_at, cancelled_at, cancel_reason)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12::jsonb, $13::jsonb, $14::jsonb, $15::jsonb, $16::jsonb, $17, $18::jsonb, $19::jsonb, $20, $21::jsonb, $22, $23, $24, $25, $26)
RETURNING *`,
[task.id, task.tenantId, task.projectId, task.queue, task.lane, task.title, task.priority, task.state, task.version, task.backendProfile, task.providerId, JSON.stringify(task.workspaceRef), JSON.stringify(task.executionPolicy), JSON.stringify(task.resourceBundleRef), JSON.stringify(task.payload), task.payloadHash, JSON.stringify(task.references), JSON.stringify(task.metadata), task.idempotencyKey ?? null, JSON.stringify(task.latestAttempt), task.sessionPath, task.createdAt, task.updatedAt, task.cancelledAt, task.cancelReason],
[task.id, task.tenantId, task.projectId, task.queue, task.lane, task.title, task.priority, task.state, task.version, task.backendProfile, task.providerId, JSON.stringify(task.workspaceRef), JSON.stringify(task.sessionRef), JSON.stringify(task.executionPolicy), JSON.stringify(task.resourceBundleRef), JSON.stringify(task.payload), task.payloadHash, JSON.stringify(task.references), JSON.stringify(task.metadata), task.idempotencyKey ?? null, JSON.stringify(task.latestAttempt), task.sessionPath, task.createdAt, task.updatedAt, task.cancelledAt, task.cancelReason],
);
return queueTaskFromRow(inserted.rows[0]);
});
@@ -1404,6 +1416,7 @@ function queueTaskFromRow(row: QueryResultRow): QueueTaskRecord {
backendProfile: stringValue(row.backend_profile) as BackendProfile,
providerId: nullableString(row.provider_id),
workspaceRef: jsonValue(row.workspace_ref) as QueueTaskRecord["workspaceRef"],
sessionRef: jsonValue(row.session_ref) as QueueTaskRecord["sessionRef"],
executionPolicy: jsonValue(row.execution_policy) as QueueTaskRecord["executionPolicy"],
resourceBundleRef: jsonValue(row.resource_bundle_ref) as QueueTaskRecord["resourceBundleRef"],
payload: jsonRecord(row.payload),
+1
View File
@@ -99,6 +99,7 @@ function buildRunInput(task: QueueTaskRecord, input: JsonRecord): CreateRunInput
tenantId: task.tenantId,
projectId: task.projectId,
workspaceRef: task.workspaceRef,
sessionRef: task.sessionRef,
resourceBundleRef: task.resourceBundleRef,
providerId: task.providerId,
backendProfile: task.backendProfile,
+24 -2
View File
@@ -451,7 +451,7 @@ export class MemoryAgentRunStore implements AgentRunStore {
}
createQueueTask(input: CreateQueueTaskInput): QueueTaskRecord {
const payloadHash = stableHash(input.payload);
const payloadHash = queueTaskPayloadHash(input);
if (input.idempotencyKey) {
const existing = Array.from(this.queueTasks.values()).find((task) => task.tenantId === input.tenantId && task.projectId === input.projectId && task.idempotencyKey === input.idempotencyKey);
if (existing) {
@@ -460,7 +460,9 @@ export class MemoryAgentRunStore implements AgentRunStore {
}
}
const at = nowIso();
const task: QueueTaskRecord = { ...input, id: newId("qt"), state: "pending", version: this.nextQueueVersion(), payloadHash, latestAttempt: null, sessionPath: null, createdAt: at, updatedAt: at, cancelledAt: null, cancelReason: null };
const sessionId = input.sessionRef?.sessionId ?? null;
const sessionPath = sessionId ? `/api/v1/sessions/${encodeURIComponent(sessionId)}` : null;
const task: QueueTaskRecord = { ...input, id: newId("qt"), state: "pending", version: this.nextQueueVersion(), payloadHash, latestAttempt: null, sessionPath, createdAt: at, updatedAt: at, cancelledAt: null, cancelReason: null };
this.queueTasks.set(task.id, task);
return task;
}
@@ -750,6 +752,26 @@ export function queueTaskSort(a: QueueTaskRecord, b: QueueTaskRecord): number {
return a.createdAt.localeCompare(b.createdAt) || a.id.localeCompare(b.id);
}
export function queueTaskPayloadHash(input: CreateQueueTaskInput): string {
return stableHash({
tenantId: input.tenantId,
projectId: input.projectId,
queue: input.queue,
lane: input.lane,
title: input.title,
priority: input.priority,
backendProfile: input.backendProfile,
providerId: input.providerId,
workspaceRef: input.workspaceRef,
sessionRef: input.sessionRef,
executionPolicy: input.executionPolicy,
resourceBundleRef: input.resourceBundleRef,
payload: input.payload,
references: input.references,
metadata: input.metadata,
});
}
export function buildQueueStats(tasks: QueueTaskRecord[], queue: string | null, generatedAt = nowIso()): QueueStats {
const byState: Record<string, number> = {};
const byLane: Record<string, number> = {};
+3 -1
View File
@@ -13,11 +13,13 @@ const selfTest: SelfTestCase = async () => {
(error) => error instanceof AgentRunError && error.failureKind === "infra-failed" && error.message.includes("DATABASE_URL is required"),
);
const postgresContract = postgresMigrationContract();
assert.equal(postgresContract.latestMigrationId, "009_v01_dsflash_go_model_catalog");
assert.equal(postgresContract.latestMigrationId, "010_v01_queue_session_ref");
assert.equal((postgresContract.migrationIds as string[]).includes("008_v01_dsflash_go_backend_profile"), true);
assert.equal((postgresContract.migrationIds as string[]).includes("009_v01_dsflash_go_model_catalog"), true);
assert.equal((postgresContract.migrationIds as string[]).includes("010_v01_queue_session_ref"), true);
assert.ok(typeof (postgresContract.checksums as Record<string, string>)["008_v01_dsflash_go_backend_profile"] === "string" && (postgresContract.checksums as Record<string, string>)["008_v01_dsflash_go_backend_profile"].length > 0);
assert.ok(typeof (postgresContract.checksums as Record<string, string>)["009_v01_dsflash_go_model_catalog"] === "string" && (postgresContract.checksums as Record<string, string>)["009_v01_dsflash_go_model_catalog"].length > 0);
assert.ok(typeof (postgresContract.checksums as Record<string, string>)["010_v01_queue_session_ref"] === "string" && (postgresContract.checksums as Record<string, string>)["010_v01_queue_session_ref"].length > 0);
assert.equal((postgresContract.checksums as Record<string, string>)["002_v01_backend_profiles"], "928b5c490cc4539cb64ecef34784557601b2724fa2870570f16a53576804e49c");
assert.ok(Array.isArray(postgresContract.requiredTables));
assert.ok(postgresContract.requiredTables.includes("agentrun_schema_migrations"));
+8 -2
View File
@@ -19,6 +19,7 @@ const selfTest: SelfTestCase = async (context) => {
backendProfile: "codex",
providerId: "G14",
workspaceRef: { kind: "host-path", path: context.workspace },
sessionRef: { sessionId: "sess_queue_q1_selftest", metadata: { source: "queue-q1-self-test" } },
executionPolicy: null,
resourceBundleRef: null,
payload: { prompt: "hello" },
@@ -28,10 +29,15 @@ const selfTest: SelfTestCase = async (context) => {
};
const created = await client.post("/api/v1/queue/tasks", input) as QueueTaskRecord;
assert.equal(created.state, "pending");
assert.equal(created.sessionPath, null);
assert.equal(created.sessionRef?.sessionId, "sess_queue_q1_selftest");
assert.equal(created.sessionPath, "/api/v1/sessions/sess_queue_q1_selftest");
assert.equal(created.latestAttempt, null);
const duplicate = await client.post("/api/v1/queue/tasks", input) as QueueTaskRecord;
assert.equal(duplicate.id, created.id);
await assert.rejects(
() => client.post("/api/v1/queue/tasks", { ...input, sessionRef: { sessionId: "sess_queue_q1_other" } }),
(error) => error instanceof Error && error.message.includes("idempotency key reused"),
);
const listed = await client.get("/api/v1/queue/tasks?queue=dev&limit=10") as QueueTaskListResult;
assert.equal(listed.count, 1);
@@ -39,7 +45,7 @@ const selfTest: SelfTestCase = async (context) => {
const shown = await client.get(`/api/v1/queue/tasks/${created.id}`) as QueueTaskRecord;
assert.equal(shown.title, "Q1 queue task");
assert.equal(shown.sessionPath, null);
assert.equal(shown.sessionPath, "/api/v1/sessions/sess_queue_q1_selftest");
const stats = await client.get("/api/v1/queue/stats?queue=dev") as QueueStats;
assert.equal(stats.total, 1);
+6 -1
View File
@@ -44,6 +44,7 @@ console.log(JSON.stringify({ apiVersion: manifest.apiVersion, kind: manifest.kin
backendProfile: "codex",
providerId: "G14",
workspaceRef: { kind: "host-path", path: context.workspace },
sessionRef: { sessionId: "sess_queue_q2_dispatch_selftest", metadata: { source: "queue-q2-self-test" } },
executionPolicy: {
sandbox: "workspace-write",
approval: "never",
@@ -64,9 +65,12 @@ console.log(JSON.stringify({ apiVersion: manifest.apiVersion, kind: manifest.kin
assert.equal(dispatched.latestAttempt.runId, dispatched.run.id);
assert.equal(dispatched.latestAttempt.commandId, dispatched.command.id);
assert.ok(dispatched.latestAttempt.runnerJobId);
assert.equal(dispatched.latestAttempt.sessionId, "sess_queue_q2_dispatch_selftest");
assert.equal(dispatched.latestAttempt.sessionPath, "/api/v1/sessions/sess_queue_q2_dispatch_selftest");
assert.equal(dispatched.task.state, "running");
assert.equal(dispatched.task.latestAttempt?.attemptId, "attempt_queue_q2_selftest");
assert.equal(dispatched.task.sessionPath, null);
assert.equal(dispatched.task.sessionPath, "/api/v1/sessions/sess_queue_q2_dispatch_selftest");
assert.equal(dispatched.run.sessionRef?.sessionId, "sess_queue_q2_dispatch_selftest");
const shown = await client.get(`/api/v1/queue/tasks/${created.id}`) as QueueTaskRecord;
assert.equal(shown.state, "running");
@@ -88,6 +92,7 @@ console.log(JSON.stringify({ apiVersion: manifest.apiVersion, kind: manifest.kin
assert.equal(refreshed.state, "completed");
assert.equal(refreshed.latestAttempt?.state, "completed");
assert.equal(refreshed.latestAttempt?.runId, dispatched.run.id);
assert.equal(refreshed.latestAttempt?.sessionPath, "/api/v1/sessions/sess_queue_q2_dispatch_selftest");
const manifest = JSON.parse(await readFile(createdManifest, "utf8")) as JsonRecord;
assert.ok(JSON.stringify(manifest).includes(dispatched.run.id));
assertNoSecretLeak(dispatched);
+17 -1
View File
@@ -1,19 +1,35 @@
import assert from "node:assert/strict";
import { readFile } from "node:fs/promises";
import { execFile } from "node:child_process";
import { promisify } from "node:util";
import path from "node:path";
import type { SelfTestCase } from "../harness.js";
const requiredRunnerPackages = Object.freeze(["git", "openssh-client", "ripgrep"]);
const execFileAsync = promisify(execFile);
const selfTest: SelfTestCase = async (context) => {
const containerfile = await readFile(path.join(context.root, "deploy/container/Containerfile"), "utf8");
const apkPackages = installedApkPackages(containerfile);
const tran = await readFile(path.join(context.root, "tools/tran"), "utf8");
const trans = await readFile(path.join(context.root, "tools/trans"), "utf8");
for (const packageName of requiredRunnerPackages) {
assert.equal(apkPackages.has(packageName), true, `runner image must install ${packageName}`);
}
return { name: "90-runner-image-tools", tests: ["runner image installs required CLI tools"] };
assert.equal(tran.startsWith("#!/usr/bin/env bun\n"), true, "tools/tran must be a shebang executable discovered by gitbundle tools");
assert.equal(trans.startsWith("#!/bin/sh\n"), true, "tools/trans must be a shebang executable discovered by gitbundle tools");
assert.equal(tran.includes("UNIDESK_SSH_CLIENT_TOKEN"), true, "tools/tran must require the scoped UniDesk SSH client token");
assert.equal(tran.includes("/ws/ssh"), true, "tools/tran must use the frontend SSH WebSocket path");
const help = await execFileAsync(path.join(context.root, "tools/tran"), ["--help"], { cwd: context.root, timeout: 10_000 });
const parsed = JSON.parse(help.stdout) as { ok?: boolean; supported?: string[]; valuesPrinted?: boolean };
assert.equal(parsed.ok, true);
assert.equal(parsed.valuesPrinted, false);
assert.equal(parsed.supported?.some((line) => line.includes("script")), true);
return { name: "90-runner-image-tools", tests: ["runner image installs required CLI tools", "gitbundle tran tools are executable and documented"] };
};
function installedApkPackages(containerfile: string): Set<string> {
Executable
+270
View File
@@ -0,0 +1,270 @@
#!/usr/bin/env bun
const defaultFrontendPort = 18081;
const sshInputChunkBytes = 32 * 1024;
function jsonHelp() {
return {
ok: true,
tool: "tran",
purpose: "AgentRun runner UniDesk SSH passthrough over frontend /ws/ssh",
requiredEnv: ["UNIDESK_SSH_CLIENT_TOKEN", "UNIDESK_MAIN_SERVER_IP or UNIDESK_FRONTEND_URL"],
supported: [
"tran <provider> <command...>",
"tran <provider> argv <command...>",
"tran <provider> script -- '<shell script>'",
"tran <provider>:/absolute/workspace script -- '<shell script>'",
"tran <provider>:k3s kubectl <args...>",
"tran <provider>:k3s script -- '<shell script>'",
"tran <provider>:k3s:<namespace>:<workload>[:container] argv <command...>",
"tran <provider>:k3s:<namespace>:<workload>[:container] script -- '<shell script>'",
],
unsupported: ["apply-patch", "upload", "download", "Windows win/ps/cmd routes"],
valuesPrinted: false,
};
}
function writeJson(value, stream = process.stdout) {
stream.write(`${JSON.stringify(value)}\n`);
}
function fail(failureKind, message, details = {}, exitCode = 2) {
writeJson({ ok: false, failureKind, message, ...details, valuesPrinted: false }, process.stderr);
process.exit(exitCode);
}
function shellQuote(value) {
return `'${String(value).replace(/'/g, `'"'"'`)}'`;
}
function shellArgv(args) {
if (args.length === 0) return "";
return args.map(shellQuote).join(" ");
}
function baseUrlFromEnv(env) {
const explicit = (env.UNIDESK_FRONTEND_URL || env.UNIDESK_MAIN_SERVER_URL || "").trim();
if (explicit) return explicit.replace(/\/+$/g, "");
const host = (env.UNIDESK_MAIN_SERVER_IP || env.UNIDESK_MAIN_SERVER_HOST || env.CODE_QUEUE_DEV_CONTAINER_MASTER_HOST || "").trim();
if (!host) return null;
if (host.startsWith("http://") || host.startsWith("https://")) return host.replace(/\/+$/g, "");
if (/:[0-9]+$/u.test(host)) return `http://${host}`;
const port = Number(env.UNIDESK_FRONTEND_PORT || env.UNIDESK_MAIN_SERVER_PORT || defaultFrontendPort);
return `http://${host}:${Number.isInteger(port) && port > 0 ? port : defaultFrontendPort}`;
}
function websocketUrl(baseUrl) {
const url = new URL("/ws/ssh", baseUrl);
url.protocol = url.protocol === "https:" ? "wss:" : "ws:";
return url.toString();
}
function parseRoute(raw) {
const parts = String(raw).split(":");
const providerId = parts.shift() || "";
if (!providerId) fail("schema-invalid", "tran route requires a provider id", { route: raw });
if (parts.length === 0) return { providerId, plane: "host", workspace: null, namespace: null, resource: null, container: null, raw };
if (parts[0] === "win") fail("unsupported-operation", "AgentRun tran does not support Windows routes yet", { route: raw });
if (parts[0] === "k3s") {
return {
providerId,
plane: "k3s",
workspace: null,
namespace: parts[1] || null,
resource: parts[2] || null,
container: parts[3] || null,
raw,
};
}
const workspace = parts.join(":");
if (!workspace.startsWith("/")) fail("schema-invalid", "host workspace routes must be absolute paths", { route: raw });
return { providerId, plane: "host", workspace, namespace: null, resource: null, container: null, raw };
}
async function readStdinText() {
const chunks = [];
for await (const chunk of Bun.stdin.stream()) chunks.push(Buffer.from(chunk));
return Buffer.concat(chunks).toString("utf8");
}
async function scriptCommand(args) {
if (args[0] === "--") {
const rest = args.slice(1);
if (rest.length === 0) return await readStdinText();
if (rest.length === 1) return rest[0];
return shellArgv(rest);
}
if (args.length === 0) return await readStdinText();
return shellArgv(args);
}
async function hostCommand(route, args) {
if (args.length === 0) return { command: null, cwd: route.workspace, tty: true };
const op = args[0];
if (op === "apply-patch" || op === "upload" || op === "download") {
fail("unsupported-operation", `AgentRun tran does not support ${op}; use host/source controlled tools outside the runner for that operation`, { operation: op });
}
if (op === "script" || op === "shell") return { command: await scriptCommand(args.slice(1)), cwd: route.workspace, tty: false };
if (op === "argv") return { command: shellArgv(args.slice(1)), cwd: route.workspace, tty: false };
return { command: shellArgv(args), cwd: route.workspace, tty: false };
}
function k3sExecPrefix(route) {
const base = ["env", "KUBECONFIG=/etc/rancher/k3s/k3s.yaml", "kubectl", "exec"];
if (route.namespace) base.push("-n", route.namespace);
if (!route.resource) fail("schema-invalid", "k3s workload routes require namespace and resource", { route: route.raw });
base.push(route.resource);
if (route.container) base.push("-c", route.container);
base.push("--");
return base;
}
async function k3sCommand(route, args) {
const op = args[0] || "kubectl";
if (op === "apply-patch" || op === "upload" || op === "download") {
fail("unsupported-operation", `AgentRun tran does not support ${op}; use host/source controlled tools outside the runner for that operation`, { operation: op });
}
if (!route.resource) {
if (op === "kubectl") return { command: shellArgv(["env", "KUBECONFIG=/etc/rancher/k3s/k3s.yaml", "kubectl", ...args.slice(1)]), cwd: null, tty: false };
if (op === "script" || op === "shell") {
const script = await scriptCommand(args.slice(1));
return { command: `export KUBECONFIG=/etc/rancher/k3s/k3s.yaml; ${script}`, cwd: null, tty: false };
}
if (op === "argv") return { command: shellArgv(["env", "KUBECONFIG=/etc/rancher/k3s/k3s.yaml", ...args.slice(1)]), cwd: null, tty: false };
return { command: shellArgv(["env", "KUBECONFIG=/etc/rancher/k3s/k3s.yaml", ...args]), cwd: null, tty: false };
}
if (op === "script" || op === "shell") {
const script = await scriptCommand(args.slice(1));
return { command: shellArgv([...k3sExecPrefix(route), "sh", "-lc", script]), cwd: null, tty: false };
}
if (op === "argv") return { command: shellArgv([...k3sExecPrefix(route), ...args.slice(1)]), cwd: null, tty: false };
return { command: shellArgv([...k3sExecPrefix(route), ...args]), cwd: null, tty: false };
}
async function buildOpenPayload(argv) {
if (argv.length === 0) fail("schema-invalid", "tran requires a route", { help: jsonHelp() });
const route = parseRoute(argv[0]);
const parsed = route.plane === "k3s" ? await k3sCommand(route, argv.slice(1)) : await hostCommand(route, argv.slice(1));
return {
providerId: route.providerId,
command: parsed.command || undefined,
cwd: parsed.cwd || undefined,
tty: parsed.tty === true,
cols: Number(process.stdout.columns) > 0 ? Number(process.stdout.columns) : 100,
rows: Number(process.stdout.rows) > 0 ? Number(process.stdout.rows) : 30,
openTimeoutMs: Math.max(15000, Math.min(Number(process.env.UNIDESK_SSH_OPEN_TIMEOUT_MS || 60000), 60000)),
runtimeTimeoutMs: Math.max(1000, Math.min(Number(process.env.UNIDESK_SSH_RUNTIME_TIMEOUT_MS || process.env.UNIDESK_TRAN_RUNTIME_TIMEOUT_MS || 60000), 60000)),
stdinEotOnEnd: true,
route: route.raw,
};
}
async function main() {
const argv = process.argv.slice(2);
if (argv[0] === "--help" || argv[0] === "help" || argv[0] === "-h") {
writeJson(jsonHelp());
return;
}
const token = (process.env.UNIDESK_SSH_CLIENT_TOKEN || "").trim();
if (!token) fail("secret-unavailable", "UNIDESK_SSH_CLIENT_TOKEN is required for runner-side tran");
const baseUrl = baseUrlFromEnv(process.env);
if (!baseUrl) fail("schema-invalid", "UNIDESK_MAIN_SERVER_IP, UNIDESK_MAIN_SERVER_HOST, or UNIDESK_FRONTEND_URL is required for runner-side tran");
const open = await buildOpenPayload(argv);
await runWebSocket(open, websocketUrl(baseUrl), token);
}
async function runWebSocket(open, url, token) {
const ws = new WebSocket(url, { headers: { authorization: `Bearer ${token}` } });
let exitCode = 255;
let canSend = false;
let sessionReady = false;
let settled = false;
const pending = [];
const pendingInput = [];
const send = (value) => {
const text = JSON.stringify(value);
if (!canSend || ws.readyState !== WebSocket.OPEN) pending.push(text);
else ws.send(text);
};
const sendInput = (value) => {
const text = JSON.stringify(value);
if (!sessionReady || ws.readyState !== WebSocket.OPEN) pendingInput.push(text);
else ws.send(text);
};
const flush = () => {
while (pending.length > 0 && ws.readyState === WebSocket.OPEN) ws.send(pending.shift());
};
const flushInput = () => {
if (!sessionReady || ws.readyState !== WebSocket.OPEN) return;
while (pendingInput.length > 0) ws.send(pendingInput.shift());
};
const finish = (code) => {
if (settled) return;
settled = true;
clearTimeout(openTimer);
clearTimeout(runtimeTimer);
process.exit(code);
};
const openTimer = setTimeout(() => {
if (sessionReady || settled) return;
process.stderr.write("unidesk runner tran timed out waiting for provider session\n");
exitCode = 255;
try { ws.close(); } catch {}
}, open.openTimeoutMs);
const runtimeTimer = setTimeout(() => {
if (settled) return;
process.stderr.write(`UNIDESK_TRAN_TIMEOUT_HINT ${JSON.stringify({ code: "tran-runtime-timeout", level: "warning", route: open.route, timeoutMs: open.runtimeTimeoutMs, message: "tran exceeded the runtime limit; use short query plus poll semantics" })}\n`);
exitCode = 124;
try { ws.close(); } catch {}
finish(124);
}, open.runtimeTimeoutMs);
ws.addEventListener("open", () => {
canSend = true;
send({ type: "ssh.open", providerId: open.providerId, command: open.command, cwd: open.cwd, tty: open.tty, cols: open.cols, rows: open.rows });
flush();
});
ws.addEventListener("message", (event) => {
let message;
const text = typeof event.data === "string" ? event.data : Buffer.from(event.data).toString("utf8");
try {
message = JSON.parse(text);
} catch {
process.stderr.write(`${text}\n`);
return;
}
if (message.type === "ssh.dispatched") return;
if (message.type === "ssh.opened") {
sessionReady = true;
clearTimeout(openTimer);
sendInput({ type: "ssh.input", data: Buffer.from([4]).toString("base64"), encoding: "base64" });
sendInput({ type: "ssh.eof" });
flushInput();
return;
}
if (message.type === "ssh.data") {
const chunk = Buffer.from(String(message.data || ""), message.encoding === "base64" ? "base64" : "utf8");
if (message.stream === "stderr") process.stderr.write(chunk);
else process.stdout.write(chunk);
return;
}
if (message.type === "ssh.error") {
process.stderr.write(`${String(message.message || "ssh bridge error")}\n`);
exitCode = 255;
try { ws.close(); } catch {}
return;
}
if (message.type === "ssh.exit") {
exitCode = Number.isInteger(message.exitCode) ? Number(message.exitCode) : 255;
try { ws.close(); } catch {}
}
});
ws.addEventListener("close", () => finish(exitCode));
ws.addEventListener("error", () => {
process.stderr.write("unidesk runner tran websocket error\n");
finish(255);
});
}
await main();
Executable
+5
View File
@@ -0,0 +1,5 @@
#!/bin/sh
set -eu
self_dir=$(CDPATH= cd -- "$(dirname -- "$0")" && pwd)
exec "$self_dir/tran" "$@"