diff --git a/docs/reference/spec-v01-postgres.md b/docs/reference/spec-v01-postgres.md index 643719e..870787c 100644 --- a/docs/reference/spec-v01-postgres.md +++ b/docs/reference/spec-v01-postgres.md @@ -37,7 +37,7 @@ Secret 名称和 key 可以在实现时按 Kubernetes 命名限制微调,但 - `agentrun_runner_jobs`:手动 runner Job 的 idempotency key、payload hash、attempt/job identity 和创建响应。 - `agentrun_sessions`:SessionRef 到 backend thread/cache identity、execution projection、active run/command、terminal/unread 水位的映射,不保存 credential 文件或 Secret 值。 - `agentrun_session_read_cursors`:按 reader 记录 Session 已读 version,用于 CLI 默认只看 running/unread。 -- `agentrun_queue_tasks`:AgentRun Queue task identity、queue/lane、tenant/project、priority、state、backendProfile、workspace/resource 引用和 version。 +- `agentrun_queue_tasks`:AgentRun Queue task identity、queue/lane、tenant/project、priority、state、backendProfile、workspace/resource 引用、可选 `sessionRef` 和 version;携带 `sessionRef` 的 task dispatch 后必须把 `sessionPath` 写回 Queue 记录。 - `agentrun_queue_attempts`:task attempt identity、runId、commandId、runnerJobId、sessionId、state、failureKind、retry index 和 timestamps。 - `agentrun_task_summaries` / `agentrun_attempt_summaries`:Queue 列表、详情和 commander 使用的轻量摘要;不能从 Core trace 反推 overview。 - `agentrun_queue_stats`:按 queue/lane/state/backendProfile 聚合的统计水位。 diff --git a/docs/reference/spec-v01-queue.md b/docs/reference/spec-v01-queue.md index 520a968..e5a7949 100644 --- a/docs/reference/spec-v01-queue.md +++ b/docs/reference/spec-v01-queue.md @@ -107,7 +107,7 @@ Session 命令负责输出、trace 和会话控制: Queue 首版新增或扩展的稳定表方向: -- `agentrun_queue_tasks`:任务 identity、tenant/project、queue/lane、title、priority、state、backendProfile、workspace/resource 引用、创建者、version 和 timestamps。 +- `agentrun_queue_tasks`:任务 identity、tenant/project、queue/lane、title、priority、state、backendProfile、workspace/resource 引用、可选 `sessionRef`、创建者、version 和 timestamps。 - `agentrun_queue_attempts`:attempt identity、taskId、runId、commandId、runnerJobId、sessionId、state、failureKind、retry index 和 timestamps。 - `agentrun_task_summaries`:task 级摘要、当前状态、最新 attempt、最新 sessionPath、最后用户可见摘要和统计字段。 - `agentrun_attempt_summaries`:attempt 级摘要、terminalStatus、failureKind、runner identity、耗时和有界输出摘要引用。 @@ -122,7 +122,7 @@ Queue 首版新增或扩展的稳定表方向: | 旧 Code Queue 能力 | AgentRun 目标 | 吸收方式 | 不保留内容 | | --- | --- | --- | --- | -| task 记录 | `agentrun_queue_tasks` | 直接建模为 Queue task | 旧 API 字段兼容 | +| task 记录 | `agentrun_queue_tasks` | 直接建模为 Queue task;需要 trace/reuse 时必须携带 `sessionRef` 并在 dispatch 后暴露 `sessionPath` | 旧 API 字段兼容 | | queueId、move、merge | queue/lane 字段和 Queue API | 保留队列归类与移动语义 | 旧 UI 操作合同 | | attempt | `agentrun_queue_attempts` + Core run/command/runner job | attempt 引用真实执行资源 | 旧 attempt 输出结构 | | `processQueue` / `runTask` | AgentRun Scheduler | 由 Scheduler 扫描 pending task/attempt 并创建 runner job | UniDesk Code Queue scheduler | diff --git a/docs/reference/spec-v01-runtime-assembly.md b/docs/reference/spec-v01-runtime-assembly.md index b7b70c8..700b3fb 100644 --- a/docs/reference/spec-v01-runtime-assembly.md +++ b/docs/reference/spec-v01-runtime-assembly.md @@ -168,6 +168,8 @@ HWLAB Workbench 的 project/workspace 不属于 RuntimeAssembly 四要素,也 runner 对 workspace `tools/` 做统一装配:顶层带 shebang 的脚本会被 `chmod +x`,`tools/` 目录会追加到 `PATH`。非 shebang 文件是随 bundle 复制的源码、测试或辅助文件,不作为可执行工具发现,也不触发 schema-invalid。短命令名称来自 repo 内真实文件,例如 `tools/hwpod`,不再由 runner 生成 wrapper。 +AgentRun 自身仓库必须提供 `tools/tran` 与 `tools/trans`,用于承接 UniDesk frontend `/ws/ssh` 的 scoped client-token 透传。runner 只通过 `executionPolicy.secretScope.toolCredentials[]` 投影 `UNIDESK_SSH_CLIENT_TOKEN`,并通过 `transientEnv` 注入非敏感 `UNIDESK_MAIN_SERVER_IP` / `UNIDESK_FRONTEND_URL`;工具不得读取 provider token、主 server SSH key 或完整 frontend 登录态。`tran --help` 必须输出 JSON,并列出当前支持的最小开发面:host/host workspace `script`、`argv`、普通 ssh-like 命令、`k3s kubectl`、`k3s script` 和 k3s workload `argv/script`。未实现的 `apply-patch`、`upload`、`download` 和 Windows route 必须显式 `unsupported-operation`,不能静默改走不受控 shell 拼接或 token fallback。 + #### promptRefs `promptRefs` 用于按同一个 materialized gitbundle checkout 的 `path` 装配初始 prompt。它承载业务域稳定 runtime/developer instruction,例如某个项目的标准入口、禁止路径和工具使用纪律;它不承载用户本轮 message,也不承载历史会话。 diff --git a/scripts/src/cli.ts b/scripts/src/cli.ts index 43e8b6e..570fd20 100644 --- a/scripts/src/cli.ts +++ b/scripts/src/cli.ts @@ -1,6 +1,6 @@ import { mkdir, readFile, rm, writeFile } from "node:fs/promises"; import { randomUUID } from "node:crypto"; -import { spawn } from "node:child_process"; +import { execFileSync, spawn } from "node:child_process"; import { closeSync, existsSync, openSync } from "node:fs"; import path from "node:path"; import { startManagerServer } from "../../src/mgr/server.js"; @@ -33,7 +33,10 @@ export async function runCli(argv: string[]): Promise { async function dispatch(args: ParsedArgs): Promise { const [group, command, id] = args.positional; - if (!group || group === "help") return help(); + if (!group || group === "help" || group === "--help") return help(args); + if (args.flags.get("help") === true) return help(args, group); + if (command === "help" || command === "--help") return help(args, group); + if (group === "manager" && (command === "url" || command === "resolve-url" || command === "status")) return managerEndpoint(args); if (group === "server" && command === "start") return startServer(args); if (group === "server" && command === "status") return serverStatus(args); if (group === "server" && command === "logs") return serverLogs(args); @@ -556,7 +559,40 @@ function client(args: ParsedArgs): ManagerClient { } function managerUrl(args: ParsedArgs): string { - return optionalFlag(args, "manager-url") ?? process.env.AGENTRUN_MGR_URL ?? "http://127.0.0.1:8080"; + const explicit = optionalFlag(args, "manager-url") ?? process.env.AGENTRUN_MGR_URL; + if (explicit && explicit !== "auto") return explicit; + if (explicit === "auto") return resolveRuntimeManagerUrl(); + return "http://127.0.0.1:8080"; +} + +function managerEndpoint(args: ParsedArgs): JsonRecord { + const url = managerUrl(args); + const explicit = optionalFlag(args, "manager-url") ?? process.env.AGENTRUN_MGR_URL ?? null; + return { + action: "manager-endpoint", + managerUrl: url, + source: explicit === "auto" ? "auto" : explicit ? (optionalFlag(args, "manager-url") ? "--manager-url" : "AGENTRUN_MGR_URL") : "default-localhost", + runtimeNamespace: "agentrun-v01", + serviceName: "agentrun-mgr", + valuesPrinted: false, + examples: { + commander: "./scripts/agentrun --manager-url auto queue commander", + sessions: "./scripts/agentrun --manager-url auto sessions ps --state default --reader-id cli", + explicit: "./scripts/agentrun --manager-url http://:8080 queue commander", + }, + }; +} + +function resolveRuntimeManagerUrl(): string { + const fromEnv = process.env.AGENTRUN_RUNTIME_MANAGER_URL; + if (fromEnv) return fromEnv; + try { + const serviceIp = execFileSync("kubectl", ["-n", "agentrun-v01", "get", "svc", "agentrun-mgr", "-o", "jsonpath={.spec.clusterIP}"], { encoding: "utf8", stdio: ["ignore", "pipe", "ignore"] }).trim(); + if (serviceIp.length > 0 && serviceIp !== "None") return `http://${serviceIp}:8080`; + } catch { + // Fall through to localhost so the following request produces a structured connection failure. + } + return "http://127.0.0.1:8080"; } function portFromManagerUrl(args: ParsedArgs): string { @@ -758,9 +794,8 @@ function cancelBody(args: ParsedArgs): JsonRecord { return reason ? { reason } : {}; } -function help(): JsonRecord { - return { - commands: [ +function help(args: ParsedArgs, group?: string): JsonRecord { + const commands = [ "runs create --json-file ", "runs show ", "runs events --after-seq --limit ", @@ -808,7 +843,16 @@ function help(): JsonRecord { "server status [--port ]", "server logs [--port ] [--tail-bytes ] [--log-file ]", "server stop [--port ]", - ], + ]; + if (group) { + const prefix = `${group} `; + const groupCommands = commands.filter((item) => item === group || item.startsWith(prefix)); + return { group, commands: groupCommands, commandCount: groupCommands.length, supported: groupCommands.length > 0, manager: managerEndpoint(args) }; + } + return { + groups: [...new Set(commands.map((item) => item.split(" ")[0]).filter(Boolean))].sort(), + commands, + manager: managerEndpoint(args), }; } diff --git a/src/common/types.ts b/src/common/types.ts index 855ca6b..d4959d3 100644 --- a/src/common/types.ts +++ b/src/common/types.ts @@ -309,6 +309,7 @@ export interface CreateQueueTaskInput extends JsonRecord { backendProfile: BackendProfile; providerId: string | null; workspaceRef: WorkspaceRef | null; + sessionRef: SessionRef | null; executionPolicy: ExecutionPolicy | null; resourceBundleRef: ResourceBundleRef | null; payload: JsonRecord; diff --git a/src/common/validation.ts b/src/common/validation.ts index a503292..1d6087e 100644 --- a/src/common/validation.ts +++ b/src/common/validation.ts @@ -318,6 +318,7 @@ export function validateCreateQueueTask(input: unknown): CreateQueueTaskInput { backendProfile: backendProfileValue, providerId: optionalString(record.providerId) ?? null, workspaceRef: record.workspaceRef === undefined || record.workspaceRef === null ? null : requiredRecord(record, "workspaceRef") as CreateQueueTaskInput["workspaceRef"], + sessionRef: validateSessionRef(record.sessionRef), executionPolicy: record.executionPolicy === undefined || record.executionPolicy === null ? null : validateExecutionPolicy(requiredRecord(record, "executionPolicy")), resourceBundleRef: validateResourceBundleRef(record.resourceBundleRef), payload: record.payload === undefined ? {} : asRecord(record.payload, "payload"), diff --git a/src/mgr/postgres-store.ts b/src/mgr/postgres-store.ts index 8ab5edc..489ab08 100644 --- a/src/mgr/postgres-store.ts +++ b/src/mgr/postgres-store.ts @@ -6,7 +6,7 @@ import { redactJson } from "../common/redaction.js"; import type { BackendProfile, BackendTurnResult, CommandRecord, CommandState, CreateCommandInput, CreateQueueTaskInput, CreateRunInput, EventType, FailureKind, JsonRecord, JsonValue, ListGcExpiredSessionsInput, QueueCommanderSnapshot, QueueReadCursorRecord, QueueStats, QueueTaskListResult, QueueTaskRecord, QueueTaskState, RunEvent, RunnerJobRecord, RunnerRecord, RunRecord, RunStatus, SessionEventPage, SessionListResult, SessionReadCursorRecord, SessionRecord, SessionRef, SessionStoragePatch, SessionSummary, TerminalStatus, UpsertSessionInput } from "../common/types.js"; import { newId, nowIso, stableHash } from "../common/validation.js"; import type { AgentRunStore, ListQueueTasksInput, ListSessionsInput, SaveRunnerJobInput, SessionEventPageInput, StoreHealth, UpdateQueueTaskAttemptInput } from "./store.js"; -import { assertSessionBoundary, buildQueueStats, buildSessionSummary, clampQueueLimit, clampSessionLimit, commandStateFromTerminal, isLeaseExpired, isSessionOutputEvent, isTerminalCommandState, isTerminalQueueTaskState, isTerminalRunStatus, parseQueueCursor, parseSessionCursor, queueTaskSort, sessionListFilters, sessionMatchesListState, sessionRefFromRecord, sessionSort, sessionTitleFromCommand, statusFromTerminal, summarizeResourceBundleRef, summarizeSessionRef, titleFromMetadata } from "./store.js"; +import { assertSessionBoundary, buildQueueStats, buildSessionSummary, clampQueueLimit, clampSessionLimit, commandStateFromTerminal, isLeaseExpired, isSessionOutputEvent, isTerminalCommandState, isTerminalQueueTaskState, isTerminalRunStatus, parseQueueCursor, parseSessionCursor, queueTaskPayloadHash, queueTaskSort, sessionListFilters, sessionMatchesListState, sessionRefFromRecord, sessionSort, sessionTitleFromCommand, statusFromTerminal, summarizeResourceBundleRef, summarizeSessionRef, titleFromMetadata } from "./store.js"; import { backendCapabilitiesSqlValues, mergeBackendCapability } from "../common/backend-profiles.js"; import { normalizeRunEventPayload, requireEventType } from "../common/events.js"; @@ -329,6 +329,11 @@ CREATE TABLE IF NOT EXISTS agentrun_queue_read_cursors ( CREATE SEQUENCE IF NOT EXISTS agentrun_queue_version_seq; `; +const queueSessionRefMigrationSql = ` +ALTER TABLE agentrun_queue_tasks + ADD COLUMN IF NOT EXISTS session_ref jsonb; +`; + const postgresMigrations: MigrationDefinition[] = [ { id: "001_v01_initial_durable_store", @@ -375,6 +380,11 @@ const postgresMigrations: MigrationDefinition[] = [ checksum: checksumSql(dsflashGoModelCatalogBackendProfileMigrationSql), sql: dsflashGoModelCatalogBackendProfileMigrationSql, }, + { + id: "010_v01_queue_session_ref", + checksum: checksumSql(queueSessionRefMigrationSql), + sql: queueSessionRefMigrationSql, + }, ]; export function postgresMigrationContract(): JsonRecord { @@ -905,7 +915,7 @@ CREATE TABLE IF NOT EXISTS agentrun_schema_migrations ( } async createQueueTask(input: CreateQueueTaskInput): Promise { - const payloadHash = stableHash(input.payload); + const payloadHash = queueTaskPayloadHash(input); return this.withTransaction(async (client) => { if (input.idempotencyKey) { const existing = await client.query("SELECT * FROM agentrun_queue_tasks WHERE tenant_id = $1 AND project_id = $2 AND idempotency_key = $3 FOR UPDATE", [input.tenantId, input.projectId, input.idempotencyKey]); @@ -917,12 +927,14 @@ CREATE TABLE IF NOT EXISTS agentrun_schema_migrations ( } const at = nowIso(); const version = await this.nextQueueVersion(client); - const task: QueueTaskRecord = { ...input, id: newId("qt"), state: "pending", version, payloadHash, latestAttempt: null, sessionPath: null, createdAt: at, updatedAt: at, cancelledAt: null, cancelReason: null }; + const sessionId = input.sessionRef?.sessionId ?? null; + const sessionPath = sessionId ? `/api/v1/sessions/${encodeURIComponent(sessionId)}` : null; + const task: QueueTaskRecord = { ...input, id: newId("qt"), state: "pending", version, payloadHash, latestAttempt: null, sessionPath, createdAt: at, updatedAt: at, cancelledAt: null, cancelReason: null }; const inserted = await client.query( - `INSERT INTO agentrun_queue_tasks (id, tenant_id, project_id, queue, lane, title, priority, state, version, backend_profile, provider_id, workspace_ref, execution_policy, resource_bundle_ref, payload, payload_hash, references_json, metadata, idempotency_key, latest_attempt, session_path, created_at, updated_at, cancelled_at, cancel_reason) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12::jsonb, $13::jsonb, $14::jsonb, $15::jsonb, $16, $17::jsonb, $18::jsonb, $19, $20::jsonb, $21, $22, $23, $24, $25) + `INSERT INTO agentrun_queue_tasks (id, tenant_id, project_id, queue, lane, title, priority, state, version, backend_profile, provider_id, workspace_ref, session_ref, execution_policy, resource_bundle_ref, payload, payload_hash, references_json, metadata, idempotency_key, latest_attempt, session_path, created_at, updated_at, cancelled_at, cancel_reason) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12::jsonb, $13::jsonb, $14::jsonb, $15::jsonb, $16::jsonb, $17, $18::jsonb, $19::jsonb, $20, $21::jsonb, $22, $23, $24, $25, $26) RETURNING *`, - [task.id, task.tenantId, task.projectId, task.queue, task.lane, task.title, task.priority, task.state, task.version, task.backendProfile, task.providerId, JSON.stringify(task.workspaceRef), JSON.stringify(task.executionPolicy), JSON.stringify(task.resourceBundleRef), JSON.stringify(task.payload), task.payloadHash, JSON.stringify(task.references), JSON.stringify(task.metadata), task.idempotencyKey ?? null, JSON.stringify(task.latestAttempt), task.sessionPath, task.createdAt, task.updatedAt, task.cancelledAt, task.cancelReason], + [task.id, task.tenantId, task.projectId, task.queue, task.lane, task.title, task.priority, task.state, task.version, task.backendProfile, task.providerId, JSON.stringify(task.workspaceRef), JSON.stringify(task.sessionRef), JSON.stringify(task.executionPolicy), JSON.stringify(task.resourceBundleRef), JSON.stringify(task.payload), task.payloadHash, JSON.stringify(task.references), JSON.stringify(task.metadata), task.idempotencyKey ?? null, JSON.stringify(task.latestAttempt), task.sessionPath, task.createdAt, task.updatedAt, task.cancelledAt, task.cancelReason], ); return queueTaskFromRow(inserted.rows[0]); }); @@ -1404,6 +1416,7 @@ function queueTaskFromRow(row: QueryResultRow): QueueTaskRecord { backendProfile: stringValue(row.backend_profile) as BackendProfile, providerId: nullableString(row.provider_id), workspaceRef: jsonValue(row.workspace_ref) as QueueTaskRecord["workspaceRef"], + sessionRef: jsonValue(row.session_ref) as QueueTaskRecord["sessionRef"], executionPolicy: jsonValue(row.execution_policy) as QueueTaskRecord["executionPolicy"], resourceBundleRef: jsonValue(row.resource_bundle_ref) as QueueTaskRecord["resourceBundleRef"], payload: jsonRecord(row.payload), diff --git a/src/mgr/queue-dispatch.ts b/src/mgr/queue-dispatch.ts index 1582c46..b011ce4 100644 --- a/src/mgr/queue-dispatch.ts +++ b/src/mgr/queue-dispatch.ts @@ -99,6 +99,7 @@ function buildRunInput(task: QueueTaskRecord, input: JsonRecord): CreateRunInput tenantId: task.tenantId, projectId: task.projectId, workspaceRef: task.workspaceRef, + sessionRef: task.sessionRef, resourceBundleRef: task.resourceBundleRef, providerId: task.providerId, backendProfile: task.backendProfile, diff --git a/src/mgr/store.ts b/src/mgr/store.ts index 558bb45..634c6fb 100644 --- a/src/mgr/store.ts +++ b/src/mgr/store.ts @@ -451,7 +451,7 @@ export class MemoryAgentRunStore implements AgentRunStore { } createQueueTask(input: CreateQueueTaskInput): QueueTaskRecord { - const payloadHash = stableHash(input.payload); + const payloadHash = queueTaskPayloadHash(input); if (input.idempotencyKey) { const existing = Array.from(this.queueTasks.values()).find((task) => task.tenantId === input.tenantId && task.projectId === input.projectId && task.idempotencyKey === input.idempotencyKey); if (existing) { @@ -460,7 +460,9 @@ export class MemoryAgentRunStore implements AgentRunStore { } } const at = nowIso(); - const task: QueueTaskRecord = { ...input, id: newId("qt"), state: "pending", version: this.nextQueueVersion(), payloadHash, latestAttempt: null, sessionPath: null, createdAt: at, updatedAt: at, cancelledAt: null, cancelReason: null }; + const sessionId = input.sessionRef?.sessionId ?? null; + const sessionPath = sessionId ? `/api/v1/sessions/${encodeURIComponent(sessionId)}` : null; + const task: QueueTaskRecord = { ...input, id: newId("qt"), state: "pending", version: this.nextQueueVersion(), payloadHash, latestAttempt: null, sessionPath, createdAt: at, updatedAt: at, cancelledAt: null, cancelReason: null }; this.queueTasks.set(task.id, task); return task; } @@ -750,6 +752,26 @@ export function queueTaskSort(a: QueueTaskRecord, b: QueueTaskRecord): number { return a.createdAt.localeCompare(b.createdAt) || a.id.localeCompare(b.id); } +export function queueTaskPayloadHash(input: CreateQueueTaskInput): string { + return stableHash({ + tenantId: input.tenantId, + projectId: input.projectId, + queue: input.queue, + lane: input.lane, + title: input.title, + priority: input.priority, + backendProfile: input.backendProfile, + providerId: input.providerId, + workspaceRef: input.workspaceRef, + sessionRef: input.sessionRef, + executionPolicy: input.executionPolicy, + resourceBundleRef: input.resourceBundleRef, + payload: input.payload, + references: input.references, + metadata: input.metadata, + }); +} + export function buildQueueStats(tasks: QueueTaskRecord[], queue: string | null, generatedAt = nowIso()): QueueStats { const byState: Record = {}; const byLane: Record = {}; diff --git a/src/selftest/cases/00-redaction-postgres.ts b/src/selftest/cases/00-redaction-postgres.ts index 6f7371a..f8f2dc1 100644 --- a/src/selftest/cases/00-redaction-postgres.ts +++ b/src/selftest/cases/00-redaction-postgres.ts @@ -13,11 +13,13 @@ const selfTest: SelfTestCase = async () => { (error) => error instanceof AgentRunError && error.failureKind === "infra-failed" && error.message.includes("DATABASE_URL is required"), ); const postgresContract = postgresMigrationContract(); - assert.equal(postgresContract.latestMigrationId, "009_v01_dsflash_go_model_catalog"); + assert.equal(postgresContract.latestMigrationId, "010_v01_queue_session_ref"); assert.equal((postgresContract.migrationIds as string[]).includes("008_v01_dsflash_go_backend_profile"), true); assert.equal((postgresContract.migrationIds as string[]).includes("009_v01_dsflash_go_model_catalog"), true); + assert.equal((postgresContract.migrationIds as string[]).includes("010_v01_queue_session_ref"), true); assert.ok(typeof (postgresContract.checksums as Record)["008_v01_dsflash_go_backend_profile"] === "string" && (postgresContract.checksums as Record)["008_v01_dsflash_go_backend_profile"].length > 0); assert.ok(typeof (postgresContract.checksums as Record)["009_v01_dsflash_go_model_catalog"] === "string" && (postgresContract.checksums as Record)["009_v01_dsflash_go_model_catalog"].length > 0); + assert.ok(typeof (postgresContract.checksums as Record)["010_v01_queue_session_ref"] === "string" && (postgresContract.checksums as Record)["010_v01_queue_session_ref"].length > 0); assert.equal((postgresContract.checksums as Record)["002_v01_backend_profiles"], "928b5c490cc4539cb64ecef34784557601b2724fa2870570f16a53576804e49c"); assert.ok(Array.isArray(postgresContract.requiredTables)); assert.ok(postgresContract.requiredTables.includes("agentrun_schema_migrations")); diff --git a/src/selftest/cases/70-queue-q1.ts b/src/selftest/cases/70-queue-q1.ts index ce21671..b56d1c6 100644 --- a/src/selftest/cases/70-queue-q1.ts +++ b/src/selftest/cases/70-queue-q1.ts @@ -19,6 +19,7 @@ const selfTest: SelfTestCase = async (context) => { backendProfile: "codex", providerId: "G14", workspaceRef: { kind: "host-path", path: context.workspace }, + sessionRef: { sessionId: "sess_queue_q1_selftest", metadata: { source: "queue-q1-self-test" } }, executionPolicy: null, resourceBundleRef: null, payload: { prompt: "hello" }, @@ -28,10 +29,15 @@ const selfTest: SelfTestCase = async (context) => { }; const created = await client.post("/api/v1/queue/tasks", input) as QueueTaskRecord; assert.equal(created.state, "pending"); - assert.equal(created.sessionPath, null); + assert.equal(created.sessionRef?.sessionId, "sess_queue_q1_selftest"); + assert.equal(created.sessionPath, "/api/v1/sessions/sess_queue_q1_selftest"); assert.equal(created.latestAttempt, null); const duplicate = await client.post("/api/v1/queue/tasks", input) as QueueTaskRecord; assert.equal(duplicate.id, created.id); + await assert.rejects( + () => client.post("/api/v1/queue/tasks", { ...input, sessionRef: { sessionId: "sess_queue_q1_other" } }), + (error) => error instanceof Error && error.message.includes("idempotency key reused"), + ); const listed = await client.get("/api/v1/queue/tasks?queue=dev&limit=10") as QueueTaskListResult; assert.equal(listed.count, 1); @@ -39,7 +45,7 @@ const selfTest: SelfTestCase = async (context) => { const shown = await client.get(`/api/v1/queue/tasks/${created.id}`) as QueueTaskRecord; assert.equal(shown.title, "Q1 queue task"); - assert.equal(shown.sessionPath, null); + assert.equal(shown.sessionPath, "/api/v1/sessions/sess_queue_q1_selftest"); const stats = await client.get("/api/v1/queue/stats?queue=dev") as QueueStats; assert.equal(stats.total, 1); diff --git a/src/selftest/cases/75-queue-q2-dispatch.ts b/src/selftest/cases/75-queue-q2-dispatch.ts index b970ff4..afb7b28 100644 --- a/src/selftest/cases/75-queue-q2-dispatch.ts +++ b/src/selftest/cases/75-queue-q2-dispatch.ts @@ -44,6 +44,7 @@ console.log(JSON.stringify({ apiVersion: manifest.apiVersion, kind: manifest.kin backendProfile: "codex", providerId: "G14", workspaceRef: { kind: "host-path", path: context.workspace }, + sessionRef: { sessionId: "sess_queue_q2_dispatch_selftest", metadata: { source: "queue-q2-self-test" } }, executionPolicy: { sandbox: "workspace-write", approval: "never", @@ -64,9 +65,12 @@ console.log(JSON.stringify({ apiVersion: manifest.apiVersion, kind: manifest.kin assert.equal(dispatched.latestAttempt.runId, dispatched.run.id); assert.equal(dispatched.latestAttempt.commandId, dispatched.command.id); assert.ok(dispatched.latestAttempt.runnerJobId); + assert.equal(dispatched.latestAttempt.sessionId, "sess_queue_q2_dispatch_selftest"); + assert.equal(dispatched.latestAttempt.sessionPath, "/api/v1/sessions/sess_queue_q2_dispatch_selftest"); assert.equal(dispatched.task.state, "running"); assert.equal(dispatched.task.latestAttempt?.attemptId, "attempt_queue_q2_selftest"); - assert.equal(dispatched.task.sessionPath, null); + assert.equal(dispatched.task.sessionPath, "/api/v1/sessions/sess_queue_q2_dispatch_selftest"); + assert.equal(dispatched.run.sessionRef?.sessionId, "sess_queue_q2_dispatch_selftest"); const shown = await client.get(`/api/v1/queue/tasks/${created.id}`) as QueueTaskRecord; assert.equal(shown.state, "running"); @@ -88,6 +92,7 @@ console.log(JSON.stringify({ apiVersion: manifest.apiVersion, kind: manifest.kin assert.equal(refreshed.state, "completed"); assert.equal(refreshed.latestAttempt?.state, "completed"); assert.equal(refreshed.latestAttempt?.runId, dispatched.run.id); + assert.equal(refreshed.latestAttempt?.sessionPath, "/api/v1/sessions/sess_queue_q2_dispatch_selftest"); const manifest = JSON.parse(await readFile(createdManifest, "utf8")) as JsonRecord; assert.ok(JSON.stringify(manifest).includes(dispatched.run.id)); assertNoSecretLeak(dispatched); diff --git a/src/selftest/cases/90-runner-image-tools.ts b/src/selftest/cases/90-runner-image-tools.ts index fdd9e83..70fe28a 100644 --- a/src/selftest/cases/90-runner-image-tools.ts +++ b/src/selftest/cases/90-runner-image-tools.ts @@ -1,19 +1,35 @@ import assert from "node:assert/strict"; import { readFile } from "node:fs/promises"; +import { execFile } from "node:child_process"; +import { promisify } from "node:util"; import path from "node:path"; import type { SelfTestCase } from "../harness.js"; const requiredRunnerPackages = Object.freeze(["git", "openssh-client", "ripgrep"]); +const execFileAsync = promisify(execFile); const selfTest: SelfTestCase = async (context) => { const containerfile = await readFile(path.join(context.root, "deploy/container/Containerfile"), "utf8"); const apkPackages = installedApkPackages(containerfile); + const tran = await readFile(path.join(context.root, "tools/tran"), "utf8"); + const trans = await readFile(path.join(context.root, "tools/trans"), "utf8"); for (const packageName of requiredRunnerPackages) { assert.equal(apkPackages.has(packageName), true, `runner image must install ${packageName}`); } - return { name: "90-runner-image-tools", tests: ["runner image installs required CLI tools"] }; + assert.equal(tran.startsWith("#!/usr/bin/env bun\n"), true, "tools/tran must be a shebang executable discovered by gitbundle tools"); + assert.equal(trans.startsWith("#!/bin/sh\n"), true, "tools/trans must be a shebang executable discovered by gitbundle tools"); + assert.equal(tran.includes("UNIDESK_SSH_CLIENT_TOKEN"), true, "tools/tran must require the scoped UniDesk SSH client token"); + assert.equal(tran.includes("/ws/ssh"), true, "tools/tran must use the frontend SSH WebSocket path"); + + const help = await execFileAsync(path.join(context.root, "tools/tran"), ["--help"], { cwd: context.root, timeout: 10_000 }); + const parsed = JSON.parse(help.stdout) as { ok?: boolean; supported?: string[]; valuesPrinted?: boolean }; + assert.equal(parsed.ok, true); + assert.equal(parsed.valuesPrinted, false); + assert.equal(parsed.supported?.some((line) => line.includes("script")), true); + + return { name: "90-runner-image-tools", tests: ["runner image installs required CLI tools", "gitbundle tran tools are executable and documented"] }; }; function installedApkPackages(containerfile: string): Set { diff --git a/tools/tran b/tools/tran new file mode 100755 index 0000000..4cc508c --- /dev/null +++ b/tools/tran @@ -0,0 +1,270 @@ +#!/usr/bin/env bun + +const defaultFrontendPort = 18081; +const sshInputChunkBytes = 32 * 1024; + +function jsonHelp() { + return { + ok: true, + tool: "tran", + purpose: "AgentRun runner UniDesk SSH passthrough over frontend /ws/ssh", + requiredEnv: ["UNIDESK_SSH_CLIENT_TOKEN", "UNIDESK_MAIN_SERVER_IP or UNIDESK_FRONTEND_URL"], + supported: [ + "tran ", + "tran argv ", + "tran script -- ''", + "tran :/absolute/workspace script -- ''", + "tran :k3s kubectl ", + "tran :k3s script -- ''", + "tran :k3s::[:container] argv ", + "tran :k3s::[:container] script -- ''", + ], + unsupported: ["apply-patch", "upload", "download", "Windows win/ps/cmd routes"], + valuesPrinted: false, + }; +} + +function writeJson(value, stream = process.stdout) { + stream.write(`${JSON.stringify(value)}\n`); +} + +function fail(failureKind, message, details = {}, exitCode = 2) { + writeJson({ ok: false, failureKind, message, ...details, valuesPrinted: false }, process.stderr); + process.exit(exitCode); +} + +function shellQuote(value) { + return `'${String(value).replace(/'/g, `'"'"'`)}'`; +} + +function shellArgv(args) { + if (args.length === 0) return ""; + return args.map(shellQuote).join(" "); +} + +function baseUrlFromEnv(env) { + const explicit = (env.UNIDESK_FRONTEND_URL || env.UNIDESK_MAIN_SERVER_URL || "").trim(); + if (explicit) return explicit.replace(/\/+$/g, ""); + const host = (env.UNIDESK_MAIN_SERVER_IP || env.UNIDESK_MAIN_SERVER_HOST || env.CODE_QUEUE_DEV_CONTAINER_MASTER_HOST || "").trim(); + if (!host) return null; + if (host.startsWith("http://") || host.startsWith("https://")) return host.replace(/\/+$/g, ""); + if (/:[0-9]+$/u.test(host)) return `http://${host}`; + const port = Number(env.UNIDESK_FRONTEND_PORT || env.UNIDESK_MAIN_SERVER_PORT || defaultFrontendPort); + return `http://${host}:${Number.isInteger(port) && port > 0 ? port : defaultFrontendPort}`; +} + +function websocketUrl(baseUrl) { + const url = new URL("/ws/ssh", baseUrl); + url.protocol = url.protocol === "https:" ? "wss:" : "ws:"; + return url.toString(); +} + +function parseRoute(raw) { + const parts = String(raw).split(":"); + const providerId = parts.shift() || ""; + if (!providerId) fail("schema-invalid", "tran route requires a provider id", { route: raw }); + if (parts.length === 0) return { providerId, plane: "host", workspace: null, namespace: null, resource: null, container: null, raw }; + if (parts[0] === "win") fail("unsupported-operation", "AgentRun tran does not support Windows routes yet", { route: raw }); + if (parts[0] === "k3s") { + return { + providerId, + plane: "k3s", + workspace: null, + namespace: parts[1] || null, + resource: parts[2] || null, + container: parts[3] || null, + raw, + }; + } + const workspace = parts.join(":"); + if (!workspace.startsWith("/")) fail("schema-invalid", "host workspace routes must be absolute paths", { route: raw }); + return { providerId, plane: "host", workspace, namespace: null, resource: null, container: null, raw }; +} + +async function readStdinText() { + const chunks = []; + for await (const chunk of Bun.stdin.stream()) chunks.push(Buffer.from(chunk)); + return Buffer.concat(chunks).toString("utf8"); +} + +async function scriptCommand(args) { + if (args[0] === "--") { + const rest = args.slice(1); + if (rest.length === 0) return await readStdinText(); + if (rest.length === 1) return rest[0]; + return shellArgv(rest); + } + if (args.length === 0) return await readStdinText(); + return shellArgv(args); +} + +async function hostCommand(route, args) { + if (args.length === 0) return { command: null, cwd: route.workspace, tty: true }; + const op = args[0]; + if (op === "apply-patch" || op === "upload" || op === "download") { + fail("unsupported-operation", `AgentRun tran does not support ${op}; use host/source controlled tools outside the runner for that operation`, { operation: op }); + } + if (op === "script" || op === "shell") return { command: await scriptCommand(args.slice(1)), cwd: route.workspace, tty: false }; + if (op === "argv") return { command: shellArgv(args.slice(1)), cwd: route.workspace, tty: false }; + return { command: shellArgv(args), cwd: route.workspace, tty: false }; +} + +function k3sExecPrefix(route) { + const base = ["env", "KUBECONFIG=/etc/rancher/k3s/k3s.yaml", "kubectl", "exec"]; + if (route.namespace) base.push("-n", route.namespace); + if (!route.resource) fail("schema-invalid", "k3s workload routes require namespace and resource", { route: route.raw }); + base.push(route.resource); + if (route.container) base.push("-c", route.container); + base.push("--"); + return base; +} + +async function k3sCommand(route, args) { + const op = args[0] || "kubectl"; + if (op === "apply-patch" || op === "upload" || op === "download") { + fail("unsupported-operation", `AgentRun tran does not support ${op}; use host/source controlled tools outside the runner for that operation`, { operation: op }); + } + if (!route.resource) { + if (op === "kubectl") return { command: shellArgv(["env", "KUBECONFIG=/etc/rancher/k3s/k3s.yaml", "kubectl", ...args.slice(1)]), cwd: null, tty: false }; + if (op === "script" || op === "shell") { + const script = await scriptCommand(args.slice(1)); + return { command: `export KUBECONFIG=/etc/rancher/k3s/k3s.yaml; ${script}`, cwd: null, tty: false }; + } + if (op === "argv") return { command: shellArgv(["env", "KUBECONFIG=/etc/rancher/k3s/k3s.yaml", ...args.slice(1)]), cwd: null, tty: false }; + return { command: shellArgv(["env", "KUBECONFIG=/etc/rancher/k3s/k3s.yaml", ...args]), cwd: null, tty: false }; + } + if (op === "script" || op === "shell") { + const script = await scriptCommand(args.slice(1)); + return { command: shellArgv([...k3sExecPrefix(route), "sh", "-lc", script]), cwd: null, tty: false }; + } + if (op === "argv") return { command: shellArgv([...k3sExecPrefix(route), ...args.slice(1)]), cwd: null, tty: false }; + return { command: shellArgv([...k3sExecPrefix(route), ...args]), cwd: null, tty: false }; +} + +async function buildOpenPayload(argv) { + if (argv.length === 0) fail("schema-invalid", "tran requires a route", { help: jsonHelp() }); + const route = parseRoute(argv[0]); + const parsed = route.plane === "k3s" ? await k3sCommand(route, argv.slice(1)) : await hostCommand(route, argv.slice(1)); + return { + providerId: route.providerId, + command: parsed.command || undefined, + cwd: parsed.cwd || undefined, + tty: parsed.tty === true, + cols: Number(process.stdout.columns) > 0 ? Number(process.stdout.columns) : 100, + rows: Number(process.stdout.rows) > 0 ? Number(process.stdout.rows) : 30, + openTimeoutMs: Math.max(15000, Math.min(Number(process.env.UNIDESK_SSH_OPEN_TIMEOUT_MS || 60000), 60000)), + runtimeTimeoutMs: Math.max(1000, Math.min(Number(process.env.UNIDESK_SSH_RUNTIME_TIMEOUT_MS || process.env.UNIDESK_TRAN_RUNTIME_TIMEOUT_MS || 60000), 60000)), + stdinEotOnEnd: true, + route: route.raw, + }; +} + +async function main() { + const argv = process.argv.slice(2); + if (argv[0] === "--help" || argv[0] === "help" || argv[0] === "-h") { + writeJson(jsonHelp()); + return; + } + const token = (process.env.UNIDESK_SSH_CLIENT_TOKEN || "").trim(); + if (!token) fail("secret-unavailable", "UNIDESK_SSH_CLIENT_TOKEN is required for runner-side tran"); + const baseUrl = baseUrlFromEnv(process.env); + if (!baseUrl) fail("schema-invalid", "UNIDESK_MAIN_SERVER_IP, UNIDESK_MAIN_SERVER_HOST, or UNIDESK_FRONTEND_URL is required for runner-side tran"); + const open = await buildOpenPayload(argv); + await runWebSocket(open, websocketUrl(baseUrl), token); +} + +async function runWebSocket(open, url, token) { + const ws = new WebSocket(url, { headers: { authorization: `Bearer ${token}` } }); + let exitCode = 255; + let canSend = false; + let sessionReady = false; + let settled = false; + const pending = []; + const pendingInput = []; + + const send = (value) => { + const text = JSON.stringify(value); + if (!canSend || ws.readyState !== WebSocket.OPEN) pending.push(text); + else ws.send(text); + }; + const sendInput = (value) => { + const text = JSON.stringify(value); + if (!sessionReady || ws.readyState !== WebSocket.OPEN) pendingInput.push(text); + else ws.send(text); + }; + const flush = () => { + while (pending.length > 0 && ws.readyState === WebSocket.OPEN) ws.send(pending.shift()); + }; + const flushInput = () => { + if (!sessionReady || ws.readyState !== WebSocket.OPEN) return; + while (pendingInput.length > 0) ws.send(pendingInput.shift()); + }; + const finish = (code) => { + if (settled) return; + settled = true; + clearTimeout(openTimer); + clearTimeout(runtimeTimer); + process.exit(code); + }; + const openTimer = setTimeout(() => { + if (sessionReady || settled) return; + process.stderr.write("unidesk runner tran timed out waiting for provider session\n"); + exitCode = 255; + try { ws.close(); } catch {} + }, open.openTimeoutMs); + const runtimeTimer = setTimeout(() => { + if (settled) return; + process.stderr.write(`UNIDESK_TRAN_TIMEOUT_HINT ${JSON.stringify({ code: "tran-runtime-timeout", level: "warning", route: open.route, timeoutMs: open.runtimeTimeoutMs, message: "tran exceeded the runtime limit; use short query plus poll semantics" })}\n`); + exitCode = 124; + try { ws.close(); } catch {} + finish(124); + }, open.runtimeTimeoutMs); + + ws.addEventListener("open", () => { + canSend = true; + send({ type: "ssh.open", providerId: open.providerId, command: open.command, cwd: open.cwd, tty: open.tty, cols: open.cols, rows: open.rows }); + flush(); + }); + ws.addEventListener("message", (event) => { + let message; + const text = typeof event.data === "string" ? event.data : Buffer.from(event.data).toString("utf8"); + try { + message = JSON.parse(text); + } catch { + process.stderr.write(`${text}\n`); + return; + } + if (message.type === "ssh.dispatched") return; + if (message.type === "ssh.opened") { + sessionReady = true; + clearTimeout(openTimer); + sendInput({ type: "ssh.input", data: Buffer.from([4]).toString("base64"), encoding: "base64" }); + sendInput({ type: "ssh.eof" }); + flushInput(); + return; + } + if (message.type === "ssh.data") { + const chunk = Buffer.from(String(message.data || ""), message.encoding === "base64" ? "base64" : "utf8"); + if (message.stream === "stderr") process.stderr.write(chunk); + else process.stdout.write(chunk); + return; + } + if (message.type === "ssh.error") { + process.stderr.write(`${String(message.message || "ssh bridge error")}\n`); + exitCode = 255; + try { ws.close(); } catch {} + return; + } + if (message.type === "ssh.exit") { + exitCode = Number.isInteger(message.exitCode) ? Number(message.exitCode) : 255; + try { ws.close(); } catch {} + } + }); + ws.addEventListener("close", () => finish(exitCode)); + ws.addEventListener("error", () => { + process.stderr.write("unidesk runner tran websocket error\n"); + finish(255); + }); +} + +await main(); diff --git a/tools/trans b/tools/trans new file mode 100755 index 0000000..9da7a1e --- /dev/null +++ b/tools/trans @@ -0,0 +1,5 @@ +#!/bin/sh +set -eu + +self_dir=$(CDPATH= cd -- "$(dirname -- "$0")" && pwd) +exec "$self_dir/tran" "$@"