diff --git a/AGENTS.md b/AGENTS.md index fd902dd..994e9ac 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -39,6 +39,7 @@ AgentRun 是面向 UniDesk 与 HWLAB 的共享 Agent 执行基础设施。本仓 ## Critical CLI Spec Rule - P0: AgentRun CLI 和服务开发必须遵循 UniDesk `cli-spec` 原则:默认 JSON 输出、禁止空输出伪成功、禁止长阻塞 CLI、日志可见、配置显式校验、稳定跨服务边界优先使用 RESTful API。G14 非交互 route 优先使用 `./scripts/agentrun ...` launcher;它只负责定位 Bun 并转入 `scripts/agentrun-cli.ts`。 +- P0: 用户级 Session 续跑只使用 `sessions send `;CLI 只做 render-only client,manager REST `/api/v1/sessions/:sessionId/send` 按 durable session 状态自动决定内部 `steer` 或新 `turn`。`sessions turn` / `sessions steer` 只能作为隐藏兼容 alias 或低层诊断入口,不能出现在默认调度、帮助文档或恢复建议中。 - P0: 一旦新增 CLI,入口文件必须保持轻量,具体实现拆入 `scripts/src/`;长任务必须快速返回,并提供 status/log/event 轮询。 ## Critical v0.1 Implementation Stack Rule diff --git a/docs/reference/spec-v01-agentrun-mgr.md b/docs/reference/spec-v01-agentrun-mgr.md index 383889e..4e6ac15 100644 --- a/docs/reference/spec-v01-agentrun-mgr.md +++ b/docs/reference/spec-v01-agentrun-mgr.md @@ -135,7 +135,9 @@ Manager 只承接 HWLAB v0.2 Code Agent 的通用执行事实,不承接 HWLAB | `executionPolicy` | 必填或由 manager 显式补齐默认值,至少包含 sandbox、approval、timeout、network 和 secretScope。 | | `traceSink` | 字段必须存在;可以为 `null` 或显式 sink。 | -`POST /api/v1/runs/:runId/commands` 必须支持 idempotency key。相同 key 且 payload hash 相同应返回既有 command;相同 key 但 payload hash 不同必须结构化失败。`type=turn` 是普通对话 command;`type=steer` 是面向同 run active turn 的运行中引导 command,payload 必须包含非空 `prompt`、`message` 或 `text`,普通 runner poll 不得把它当作新 turn 执行;`type=interrupt` 只保留 durable command 语义,业务 cancel 仍以 run/command cancel API 为权威。 +`POST /api/v1/sessions/:sessionId/send` 是用户级 Session 续跑的唯一 REST 入口。客户端只提交 prompt/payload、可选 run base 和 runner job override;manager 必须读取 durable session/run/command 状态后自动决定内部行为:只有 active `turn` command 已被 runner ack、run 处于 claimed/running 且 lease 未过期,才创建 `type=steer` command;pending/waiting-runner、stale lease、terminal 或无 active command 都必须创建新 run、`type=turn` command,并按请求创建 runner job。响应必须暴露 `decision`、`internalCommandType`、run/command/runnerJob 摘要、activeBefore 和 `valuesPrinted=false`。带 `dryRun=true` 时只返回 non-mutating plan,不得创建 session、PVC、run、command 或 runner job。 + +`POST /api/v1/runs/:runId/commands` 必须支持 idempotency key。相同 key 且 payload hash 相同应返回既有 command;相同 key 但 payload hash 不同必须结构化失败。`type=turn` 是普通对话 command;`type=steer` 是面向同 run active turn 的运行中引导 command,payload 必须包含非空 `prompt`、`message` 或 `text`,普通 runner poll 不得把它当作新 turn 执行;`type=interrupt` 只保留 durable command 语义,业务 cancel 仍以 run/command cancel API 为权威。`turn` / `steer` 是 manager 内部 command type 和低层诊断资源,不是用户级 CLI 分叉。 ## Tenant Policy Boundary @@ -165,6 +167,7 @@ Manager 只承接 HWLAB v0.2 Code Agent 的通用执行事实,不承接 HWLAB | `terminalStatus` | `completed`、`failed`、`blocked` 或 `cancelled`;没有 terminal event 时为 `null` 或 equivalent running 状态。 | | `completed` / `terminalSource` | `completed=true` 只能来自 terminal completed;`terminalSource` 标明来自 `terminal_status` event、run record 或暂无 terminal。 | | `reply` / `finalResponse` | 从 `assistant_message` 聚合的最终用户可见文本;若存在 `replyAuthority=true` 或 `final=true` 的 `assistant_message`,必须取最后一条作为 authoritative reply。没有 authoritative final 时,result 可以 fallback 到 terminal 前最后一条非空 assistant 文本,但必须在 `finalResponse` 暴露 `seq`、`source`、`replyAuthority`、`final`、`textTruncated` 和 `outputTruncated`,让消费侧知道它是可见性 fallback,不是 backend final authority。没有 terminal completed 时不得伪造 completed reply。 | +| `finalResponseAuthority` / `finalResponseFallback` / `needsContinuation` / `completionEvidence` | 必须在 result 顶层暴露最终回复权威性。`finalResponseAuthority` 只能是 `authoritative`、`fallback` 或 `missing`;terminal completed 但没有 authoritative final 时,`needsContinuation=true`,`completionEvidence` 必须说明原因并给出同 session 的 `sessions send` 恢复入口。 | | `finalAssistantSeq` / `finalAssistantSource` | 必须指向 result 本次选中的 assistant event;长 trace、steer 或 progress snapshot 场景不能让早期 assistant row 继续冒充最终摘要。 | | `finalAssistantTextTruncated` / `finalAssistantOutputTruncated` | 必须原样暴露被选中 assistant event 的截断标记;被选中的最终摘要截断时,消费侧应继续读 events 或 trace,而不是把截断隐藏成完整 final。 | | `failureKind` / `blocker` | 结构化失败分类和摘要;必须 redacted。 | @@ -180,7 +183,7 @@ Manager 只承接 HWLAB v0.2 Code Agent 的通用执行事实,不承接 HWLAB `GET /api/v1/sessions/:sessionId` 作为 session status 入口,必须在存在 active/last run 时透出同一套 `liveness` 和 `supervisor` 摘要;该摘要是观测辅助,不能替代 command terminal、run terminal 或 raw events 的事实来源。 -当 command 因 idle timeout、provider stream disconnect、runner stdio inactive 或其他非业务终态失败时,manager 的恢复建议必须面向指挥官而不是要求 worker 自行读 trace。指挥官应先读取 `result`、`events` 或 `sessions/:id/trace` 确认最后有效 activity、已完成修改和卡点;若 run/task 有可继续的 `sessionRef`,后续 prompt 必须用同一个 AgentRun session 通过 `send session/` 或 `steer session/` 续跑,并在 prompt 中写入管理者从 trace 得出的下一步。只有旧任务没有 `sessionRef`、session 已 evicted、或同 session 已证明不可恢复时,才创建带管理者摘要的新任务。 +当 command 因 idle timeout、provider stream disconnect、runner stdio inactive、completed-without-authoritative-final 或其他非业务终态失败时,manager 的恢复建议必须面向指挥官而不是要求 worker 自行读 trace。指挥官应先读取 `result`、`events` 或 `sessions/:id/trace` 确认最后有效 activity、已完成修改和卡点;若 run/task 有可继续的 `sessionRef`,后续 prompt 必须用同一个 AgentRun session 通过 `sessions send ` 续跑,并在 prompt 中写入管理者从 trace 得出的下一步。只有旧任务没有 `sessionRef`、session 已 evicted、或同 session 已证明不可恢复时,才创建带管理者摘要的新任务。 当 `commandId` 已指定,result envelope 必须只聚合该 command 的 assistant/output/error/terminal 事件;同一 run 的其他 command reply 不能串入当前 command result。未指定 `commandId` 时可默认选择最新 command。 diff --git a/docs/reference/spec-v01-cli.md b/docs/reference/spec-v01-cli.md index 48632dd..fdbaa2f 100644 --- a/docs/reference/spec-v01-cli.md +++ b/docs/reference/spec-v01-cli.md @@ -74,9 +74,8 @@ CLI 官方 TypeScript 入口固定为 `scripts/agentrun-cli.ts`。在 G14 非交 ./scripts/agentrun queue refresh [--dry-run] [--full|--raw] ./scripts/agentrun sessions ps [--state default|running|unread|terminal|idle|all] [--profile codex|deepseek|minimax-m3|dsflash-go|M3] [--reader-id ] ./scripts/agentrun sessions show [--reader-id ] -./scripts/agentrun sessions turn [sessionId] [--json-stdin|--json-file ] [--prompt-stdin|--prompt-file |--prompt ] [--profile codex|deepseek|minimax-m3|dsflash-go|M3] [--runner-json-stdin|--runner-json-file ] [--no-runner-job] -./scripts/agentrun sessions turn [sessionId] --aipod [--prompt-stdin|--prompt-file |--prompt ] [--runner-json-stdin|--runner-json-file ] [--no-runner-job] -./scripts/agentrun sessions steer [--prompt-stdin|--prompt-file |--prompt ] +./scripts/agentrun sessions send [sessionId] [--json-stdin|--json-file ] [--prompt-stdin|--prompt-file |--prompt ] [--profile codex|deepseek|minimax-m3|dsflash-go|M3] [--runner-json-stdin|--runner-json-file ] [--no-runner-job] [--dry-run] +./scripts/agentrun sessions send [sessionId] --aipod [--prompt-stdin|--prompt-file |--prompt ] [--runner-json-stdin|--runner-json-file ] [--no-runner-job] [--dry-run] ./scripts/agentrun sessions cancel [--reason ] ./scripts/agentrun sessions trace [--after-seq ] [--limit ] [--run-id ] [--include-output] [--seq |--event-id |--item-id ] [--detail-scan-pages ] [--full|--raw] ./scripts/agentrun sessions output [--after-seq ] [--limit ] [--run-id ] [--include-output] [--seq |--event-id |--item-id ] [--detail-scan-pages ] [--full|--raw] @@ -115,10 +114,11 @@ CLI 官方 TypeScript 入口固定为 `scripts/agentrun-cli.ts`。在 G14 非交 - `queue refresh` 只根据 Queue task 中保存的 Core run/command 引用回写 Queue attempt 状态,不读取 Core trace 反推 commander 或统计;带 `--dry-run` 时不得写回状态。 - `queue list/show/commander` 默认返回低噪声 summary,只显示 task/attempt/session ids、state、read cursor、stats 相关字段、compact supervisor 和 drill-down 命令;commander 的 supervisor 只能放 `phase`、last activity source seq/id、timeout budget 和恢复动作摘要,不得展开完整 payload、trace、tool command、stdout/stderr 或 runnerTrace。需要完整 task payload、resource bundle 或 metadata 时显式使用 `--full|--raw`;需要 trace/output 细节时继续按返回的 `sessionId`/`sourceSeq` 走 `sessions trace|output --seq/--event-id/--item-id --full`。 - `queue show` 不得返回或代理完整 output/trace;输出和 trace 只能通过返回的 `sessionPath` 对应 `sessions ...` 命令查询。 -- 需要提交较长 Queue task、dispatch body、run base 或 command payload 时,CLI 必须把 `--json-stdin` 作为首选入口,避免为了 heredoc/stdin 内容先写临时 dump 文件再传 `--json-file`;`--json-file` 只用于可复用、已受控的输入文件。`queue submit/dispatch --dry-run` 的 `next.confirm` 不得默认推荐 `--json-file`,有 JSON body 时应提示 `--json-stdin`;只有用户显式维护可复用文件时才使用 file fallback。`sessions turn` 的 runner job override 也必须支持 `--runner-json-stdin`。所有 stdin JSON 仍必须解析为 object,并在 dry-run 中只展示有界 body 摘要、bytes 和 keys。 +- 需要提交较长 Queue task、dispatch body、run base 或 command payload 时,CLI 必须把 `--json-stdin` 作为首选入口,避免为了 heredoc/stdin 内容先写临时 dump 文件再传 `--json-file`;`--json-file` 只用于可复用、已受控的输入文件。`queue submit/dispatch --dry-run` 的 `next.confirm` 不得默认推荐 `--json-file`,有 JSON body 时应提示 `--json-stdin`;只有用户显式维护可复用文件时才使用 file fallback。`sessions send` 的 runner job override 也必须支持 `--runner-json-stdin`。所有 stdin JSON 仍必须解析为 object,并在 dry-run 中只展示有界 body 摘要、bytes 和 keys。 - `sessions ps` 默认只显示 running 和 unread session;`--state all` 才显示历史 read session,避免旧 session 噪声淹没当前进度。 -- `sessions turn` 是异步 subagent 的受控 CLI 入口:短返回 run、command、runnerJob 和后续 poll/read/steer/cancel 命令,不等待模型完成。`--profile M3` 是 `minimax-m3` 的 CLI alias;profile 仍写入 canonical `backendProfile`,不得 fallback。 -- `sessions steer` 对当前 active run 创建 `type=steer` command;`sessions cancel` 通过 Session control 取消 active command 或 run;`sessions read` 写入 reader cursor,使 terminal session 从默认 ps 中消失。 +- `sessions send` 是异步 subagent 的唯一用户级受控 CLI 入口:短返回 manager 决策、内部 command type、run/command/runnerJob 摘要和后续 poll/read/cancel 命令,不等待模型完成。CLI 只做 render-only client,manager 的 `/api/v1/sessions/:sessionId/send` 读取 durable session 状态后自动决定内部创建 `type=steer` 还是新 `type=turn` + runner job。`--profile M3` 是 `minimax-m3` 的 CLI alias;profile 仍写入 canonical `backendProfile`,不得 fallback。 +- `sessions send --dry-run` 必须全路径 non-mutating,只返回将提交给 manager 的有界计划和 manager 根据当前 session 状态可判断的 `decision`,不得创建 session、PVC、run、command 或 runner job。`sessions turn` / `sessions steer` 只能作为隐藏兼容 alias 或低层诊断入口;兼容 alias 也必须调用同一个 send REST 路径,不得强制内部 command type,不得出现在默认 help、恢复建议或调度者工作流中。 +- `sessions cancel` 通过 Session control 取消 active command 或 run;`sessions read` 写入 reader cursor,使 terminal session 从默认 ps 中消失。 - `sessions output` 与 `sessions trace` 是输出和 trace 的唯一 CLI 查询入口;不得新增 `queue output` 或 `queue trace` 兼容命令。 - `sessions output` 与 `sessions trace` 默认必须按渐进披露输出低噪声 JSON:只展示 `assistant_message` 与 `tool_call`/`error` 摘要,`command_output`、`backend_status`、raw event、runnerTrace 和大 stdout/stderr 只进入 `suppressedEvents` 计数与 bytes,不得默认展开正文。需要查看工具输出、backend_status 或原始 event 时,必须通过默认摘要中的 `detailCommands`,或显式使用 `--seq `、`--event-id `、`--item-id `、`--include-output`、`--full`/`--raw` 做定点展开;默认摘要生成的 `detailCommands` 必须带上能定位该 event 的最小 `--after-seq`/`--limit` hint,避免按 id 拉详情时重新扫描长 trace。这样保证默认不爆上下文,同时按 id/seq 可完整追溯。 @@ -175,10 +175,10 @@ CLI 官方 TypeScript 入口固定为 `scripts/agentrun-cli.ts`。在 G14 非交 | Queue CLI | 已实现/Q1 | 已提供 `queue submit/list/show/stats/commander/read/cancel`,通过 manager REST 访问 Queue task 和 stats,不直连 Postgres。 | | Queue dispatch/refresh CLI | 已实现/Q2 | `queue dispatch` 受控创建 Core run/command/runner job;`queue refresh` 从 Core run/command 终态回写 Queue task/latestAttempt。 | | 本地 server 生命周期 CLI | 已实现/Q2 hardening | `server start` 默认后台短返回,`server status/stop` 提供 pid、port、logPath 和 readiness 可见性;`--foreground` 保留给容器/显式调试。 | -| Session CLI | 已实现/Q3 | 已提供 `sessions ps/show/turn/steer/cancel/trace/output/read`;默认 ps 只显示 running/unread,terminal 后自动 unread,read cursor 由 CLI 标记。 | +| Session CLI | 已实现/Q3 | 已提供 `sessions ps/show/send/cancel/trace/output/read`;默认 ps 只显示 running/unread,terminal 后自动 unread,read cursor 由 CLI 标记。`turn/steer` 仅保留隐藏兼容 alias 和低层 command type。 | | CLI 测试规格 | 已定义/已验证主闭环 | 综合联调见 [spec-v01-validation.md](spec-v01-validation.md);每次发布仍按手动交互验收复跑。 | | `deepseek` profile CLI | 已实现/已通过主闭环 | `secrets codex render --profile deepseek`、`backends list`、`runner start --backend`、`runner job` 和 JSON 错误可见性已实现;真实 CLI/RESTful 联调已通过 `codex -> deepseek -> codex` 切换主闭环。 | | Provider profile 管理 CLI | 已实现 | `provider-profiles list/show/remove/set-key/validate` 调用 manager REST API,用于 HWLAB 委托和 operator 验收;输出必须持续保持 Secret/API Key 脱敏。 | | Tool credential 管理 CLI | 已实现 | `tool-credentials list/show/set-github-ssh` 调用 manager REST API,用于 Artificer GitHub SSH Secret bootstrap;输出只包含 SecretRef、key presence、bytes 和 hash suffix。 | -| `minimax-m3` profile CLI | 已实现/待真实主闭环 | `secrets codex render --profile minimax-m3`、`backends list`、`runner start --backend`、`runner job`、`sessions turn --profile minimax-m3|M3` 和 JSON 错误可见性已实现;真实 CLI/RESTful 联调需要按 `codex -> deepseek -> minimax-m3 -> codex` 手动验收。 | -| `dsflash-go` profile CLI | 已实现/待真实主闭环 | `secrets codex render --profile dsflash-go --model-catalog-file`、`backends list`、`runner start --backend`、`runner job`、`sessions turn --profile dsflash-go` 和 JSON 错误可见性已实现;真实 CLI/RESTful 联调需要按 `codex -> deepseek -> minimax-m3 -> dsflash-go -> codex` 手动验收,并确认 compact 404 分类为 `provider-compact-unsupported`。 | +| `minimax-m3` profile CLI | 已实现/待真实主闭环 | `secrets codex render --profile minimax-m3`、`backends list`、`runner start --backend`、`runner job`、`sessions send --profile minimax-m3|M3` 和 JSON 错误可见性已实现;真实 CLI/RESTful 联调需要按 `codex -> deepseek -> minimax-m3 -> codex` 手动验收。 | +| `dsflash-go` profile CLI | 已实现/待真实主闭环 | `secrets codex render --profile dsflash-go --model-catalog-file`、`backends list`、`runner start --backend`、`runner job`、`sessions send --profile dsflash-go` 和 JSON 错误可见性已实现;真实 CLI/RESTful 联调需要按 `codex -> deepseek -> minimax-m3 -> dsflash-go -> codex` 手动验收,并确认 compact 404 分类为 `provider-compact-unsupported`。 | diff --git a/scripts/src/cli.ts b/scripts/src/cli.ts index ffce01a..398d534 100644 --- a/scripts/src/cli.ts +++ b/scripts/src/cli.ts @@ -10,7 +10,7 @@ import { runOnce } from "../../src/runner/run-once.js"; import { renderRunnerJobDryRun } from "../../src/runner/k8s-job.js"; import type { RunnerSessionPvcOptions } from "../../src/runner/k8s-job.js"; import { renderCodexProviderSecretPlan } from "./secret-render.js"; -import type { BackendProfile, CommandRecord, JsonRecord, JsonValue, RenderAipodInput, RenderedAipodQueueTask, RunRecord, SessionSummary } from "../../src/common/types.js"; +import type { BackendProfile, JsonRecord, JsonValue, RenderAipodInput, RenderedAipodQueueTask, RunRecord } from "../../src/common/types.js"; import { AgentRunError, errorToJson } from "../../src/common/errors.js"; import type { RunnerOnceOptions } from "../../src/runner/run-once.js"; import { backendProfileSpec, isBackendProfile } from "../../src/common/backend-profiles.js"; @@ -81,6 +81,7 @@ async function dispatch(args: ParsedArgs): Promise { if (group === "sessions" && command === "read" && id) return sessionRead(args, id); if (group === "sessions" && command === "trace" && id) return sessionEvents(args, id, "trace"); if (group === "sessions" && command === "output" && id) return sessionEvents(args, id, "output"); + if (group === "sessions" && command === "send") return sessionSend(args, id ?? null); if (group === "sessions" && command === "turn") return sessionTurn(args, id ?? null); if (group === "sessions" && command === "steer" && id) return sessionSteer(args, id); if (group === "sessions" && command === "cancel" && id) return sessionCancel(args, id); @@ -676,6 +677,77 @@ function summarizeSessionMutationResult(action: "session-cancel" | "session-read }; } +function summarizeSessionSendResult(result: JsonValue, sessionId: string, compatibilityAlias: "turn" | "steer" | null, profile: string, aipod?: string): JsonRecord { + const record = jsonRecordValue(result); + const run = jsonRecordValue(record?.run); + const command = jsonRecordValue(record?.command); + const activeBefore = jsonRecordValue(record?.activeBefore); + const runnerJob = jsonRecordValue(record?.runnerJob); + const dryRun = record?.dryRun === true; + const afterSeq = numberValue(jsonRecordValue(record?.supervisor)?.lastSeq) ?? 0; + return { + action: dryRun ? "session-send-plan" : "session-send", + sessionId, + profile, + ...(aipod ? { aipod } : {}), + compatibilityAlias, + dryRun, + mutation: record?.mutation === true, + decision: stringValue(record?.decision), + internalCommandType: stringValue(record?.internalCommandType), + activeBefore: activeBefore ? compactRecord(activeBefore, { keys: ["runId", "commandId", "commandState", "runStatus", "leaseExpiresAt", "leaseExpired", "reason"] }) : null, + run: summarizeRunRecord(run), + command: summarizeCommandRecord(command), + runnerJob: runnerJob ? compactRecord(runnerJob, { keys: ["action", "runId", "commandId", "attemptId", "runnerId", "namespace", "jobName", "image", "mutation"] }) : null, + fullResponseBytes: jsonByteLength(result), + valuesPrinted: false, + drillDownCommands: { + show: `./scripts/agentrun sessions show ${sessionId} --reader-id cli`, + trace: `./scripts/agentrun sessions trace ${sessionId} --after-seq ${afterSeq} --limit 100`, + output: `./scripts/agentrun sessions output ${sessionId} --after-seq ${afterSeq} --limit 100`, + read: `./scripts/agentrun sessions read ${sessionId} --reader-id cli`, + cancel: `./scripts/agentrun sessions cancel ${sessionId}`, + }, + expandedOutput: { + fullFlag: "--full", + rawFlag: "--raw", + note: dryRun ? "Dry-run is non-mutating; remove --dry-run to send." : "Use --full on the original invocation for the full manager response.", + }, + }; +} + +async function sessionRunnerJobBody(args: ParsedArgs, defaults: JsonRecord = {}): Promise { + const runnerOverrides = await optionalRunnerJsonFile(args); + const body = { ...defaults, ...runnerOverrides } as JsonRecord; + copyOptionalFlag(args, body, "image"); + copyOptionalFlag(args, body, "namespace"); + copyOptionalFlag(args, body, "attempt-id", "attemptId"); + copyOptionalFlag(args, body, "runner-id", "runnerId"); + copyOptionalFlag(args, body, "source-commit", "sourceCommit"); + copyRunnerManagerUrlFlag(args, body); + copyOptionalFlag(args, body, "service-account-name", "serviceAccountName"); + const runnerIdempotencyKey = optionalFlag(args, "runner-idempotency-key"); + if (runnerIdempotencyKey) body.idempotencyKey = runnerIdempotencyKey; + return body; +} + +async function ensureSessionForSend(args: ParsedArgs, sessionId: string, tenantId: string, projectId: string, profile: string): Promise { + try { + await client(args).get(`/api/v1/sessions/${encodeURIComponent(sessionId)}/storage`); + return; + } catch (error) { + if (!(error instanceof AgentRunError) || error.httpStatus !== 404) throw error; + } + const expiresInDays = Number(optionalFlag(args, "expires-in-days") ?? 30); + await client(args).post("/api/v1/sessions", { + sessionId, + tenantId, + projectId, + backendProfile: profile, + expiresAt: new Date(Date.now() + Math.max(1, expiresInDays) * 24 * 60 * 60 * 1000).toISOString(), + }); +} + interface QueueSummaryOptions { limit: number; } @@ -1145,7 +1217,7 @@ async function sessionCreate(args: ParsedArgs, positionalSessionId: string | nul show: `./scripts/agentrun sessions show ${sessionId} --reader-id cli`, storage: `./scripts/agentrun sessions storage ${sessionId}`, trace: `./scripts/agentrun sessions trace ${sessionId} --after-seq 0 --limit 100`, - turn: `./scripts/agentrun sessions turn ${sessionId} --prompt "..."`, + send: `./scripts/agentrun sessions send ${sessionId} --prompt "..."`, }, }; } @@ -1159,26 +1231,20 @@ async function sessionStorageDelete(args: ParsedArgs, sessionId: string): Promis } async function sessionTurn(args: ParsedArgs, positionalSessionId: string | null): Promise { + return sessionSend(args, positionalSessionId, { compatibilityAlias: "turn" }); +} + +async function sessionSteer(args: ParsedArgs, sessionId: string): Promise { + return sessionSend(args, sessionId, { compatibilityAlias: "steer" }); +} + +async function sessionSend(args: ParsedArgs, positionalSessionId: string | null, options: { compatibilityAlias?: "turn" | "steer" } = {}): Promise { const aipod = optionalFlag(args, "aipod") ?? optionalFlag(args, "aipod-spec"); - if (aipod) return sessionTurnWithAipod(args, positionalSessionId, aipod); + if (aipod) return sessionSendWithAipod(args, positionalSessionId, aipod, options); const body = await optionalJsonFile(args); const sessionId = positionalSessionId ?? optionalFlag(args, "session-id") ?? newSessionId(); const requestedProfile = optionalFlag(args, "profile") ?? optionalFlag(args, "backend-profile") ?? (typeof body.backendProfile === "string" ? String(body.backendProfile) : "codex"); const profile = normalizeProfile(requestedProfile); - if (positionalSessionId || optionalFlag(args, "session-id")) { - try { - await client(args).get(`/api/v1/sessions/${encodeURIComponent(sessionId)}/storage`); - } catch (error) { - const expiresInDays = Number(optionalFlag(args, "expires-in-days") ?? 30); - await client(args).post("/api/v1/sessions", { - sessionId, - tenantId: optionalFlag(args, "tenant-id") ?? "unidesk", - projectId: optionalFlag(args, "project-id") ?? "default", - backendProfile: profile, - expiresAt: new Date(Date.now() + Math.max(1, expiresInDays) * 24 * 60 * 60 * 1000).toISOString(), - }); - } - } const prompt = await readPrompt(args); body.tenantId = optionalFlag(args, "tenant-id") ?? stringField(body, "tenantId", "unidesk"); body.projectId = optionalFlag(args, "project-id") ?? stringField(body, "projectId", "default"); @@ -1192,39 +1258,20 @@ async function sessionTurn(args: ParsedArgs, positionalSessionId: string | null) const title = optionalFlag(args, "title"); if (title) sessionMetadata.title = title; body.sessionRef = { ...sessionRef, sessionId, metadata: sessionMetadata }; - const run = await client(args).post("/api/v1/runs", body) as RunRecord; - const commandBody: JsonRecord = { type: "turn", payload: { prompt } }; + const runnerBody = await sessionRunnerJobBody(args); + const sendBody: JsonRecord = { + run: body, + payload: { prompt }, + createRunnerJob: args.flags.get("no-runner-job") !== true, + runnerJob: runnerBody, + dryRun: args.flags.get("dry-run") === true, + }; const commandIdempotencyKey = optionalFlag(args, "command-idempotency-key") ?? optionalFlag(args, "idempotency-key"); - if (commandIdempotencyKey) commandBody.idempotencyKey = commandIdempotencyKey; - const command = await client(args).post(`/api/v1/runs/${encodeURIComponent(run.id)}/commands`, commandBody) as CommandRecord; - let runnerJob: JsonValue = null; - if (args.flags.get("no-runner-job") !== true) { - const runnerBody = await optionalRunnerJsonFile(args); - runnerBody.commandId = command.id; - copyOptionalFlag(args, runnerBody, "image"); - copyOptionalFlag(args, runnerBody, "namespace"); - copyOptionalFlag(args, runnerBody, "attempt-id", "attemptId"); - copyOptionalFlag(args, runnerBody, "runner-id", "runnerId"); - copyOptionalFlag(args, runnerBody, "source-commit", "sourceCommit"); - copyRunnerManagerUrlFlag(args, runnerBody); - copyOptionalFlag(args, runnerBody, "service-account-name", "serviceAccountName"); - const runnerIdempotencyKey = optionalFlag(args, "runner-idempotency-key"); - if (runnerIdempotencyKey) runnerBody.idempotencyKey = runnerIdempotencyKey; - runnerJob = await client(args).post(`/api/v1/runs/${encodeURIComponent(run.id)}/runner-jobs`, runnerBody); - } - return { action: "session-turn", sessionId, profile, run, command, runnerJob, pollCommands: { ps: `./scripts/agentrun sessions ps --reader-id cli --profile ${profile}`, show: `./scripts/agentrun sessions show ${sessionId} --reader-id cli`, trace: `./scripts/agentrun sessions trace ${sessionId} --after-seq 0 --limit 100`, output: `./scripts/agentrun sessions output ${sessionId} --after-seq 0 --limit 100`, read: `./scripts/agentrun sessions read ${sessionId} --reader-id cli`, steer: `./scripts/agentrun sessions steer ${sessionId} --prompt-file `, cancel: `./scripts/agentrun sessions cancel ${sessionId}` } }; -} - -async function sessionSteer(args: ParsedArgs, sessionId: string): Promise { - const session = await client(args).get(`/api/v1/sessions/${encodeURIComponent(sessionId)}${readerQuery(args)}`) as SessionSummary; - const runId = session.activeRunId ?? session.lastRunId; - if (!runId) throw new AgentRunError("schema-invalid", `session ${sessionId} has no run to steer`, { httpStatus: 2 }); - const prompt = await readPrompt(args); - const body: JsonRecord = { type: "steer", payload: { prompt } }; - const idempotencyKey = optionalFlag(args, "idempotency-key"); - if (idempotencyKey) body.idempotencyKey = idempotencyKey; - const command = await client(args).post(`/api/v1/runs/${encodeURIComponent(runId)}/commands`, body); - return { action: "session-steer", sessionId, runId, command }; + if (commandIdempotencyKey) sendBody.commandIdempotencyKey = commandIdempotencyKey; + if (args.flags.get("dry-run") !== true) await ensureSessionForSend(args, sessionId, body.tenantId as string, body.projectId as string, profile); + const result = await client(args).post(`/api/v1/sessions/${encodeURIComponent(sessionId)}/send`, sendBody); + if (wantsExpandedOutput(args)) return { action: "session-send", compatibilityAlias: options.compatibilityAlias ?? null, result: result as JsonValue, valuesPrinted: false }; + return summarizeSessionSendResult(result, sessionId, options.compatibilityAlias ?? null, profile); } async function sessionCancel(args: ParsedArgs, sessionId: string): Promise { @@ -1296,25 +1343,11 @@ async function submitQueueTaskWithAipod(args: ParsedArgs, aipod: string): Promis return client(args).post("/api/v1/queue/tasks", body); } -async function sessionTurnWithAipod(args: ParsedArgs, positionalSessionId: string | null, aipod: string): Promise { +async function sessionSendWithAipod(args: ParsedArgs, positionalSessionId: string | null, aipod: string, options: { compatibilityAlias?: "turn" | "steer" } = {}): Promise { const sessionId = positionalSessionId ?? optionalFlag(args, "session-id") ?? newSessionId(); const rendered = await renderAipodForCommand(args, aipod, positionalSessionId ? 3 : 2, { sessionId }); const task = rendered.queueTask; const profile = String(task.backendProfile); - if (positionalSessionId || optionalFlag(args, "session-id")) { - try { - await client(args).get(`/api/v1/sessions/${encodeURIComponent(sessionId)}/storage`); - } catch { - const expiresInDays = Number(optionalFlag(args, "expires-in-days") ?? 30); - await client(args).post("/api/v1/sessions", { - sessionId, - tenantId: task.tenantId, - projectId: task.projectId, - backendProfile: task.backendProfile, - expiresAt: new Date(Date.now() + Math.max(1, expiresInDays) * 24 * 60 * 60 * 1000).toISOString(), - }); - } - } const sessionRef = objectField(task as unknown as JsonRecord, "sessionRef", {}); const metadata = objectField(sessionRef, "metadata", {}); const title = optionalFlag(args, "title") ?? task.title; @@ -1330,26 +1363,21 @@ async function sessionTurnWithAipod(args: ParsedArgs, positionalSessionId: strin resourceBundleRef: task.resourceBundleRef, traceSink: { kind: "aipod-session", aipod, sessionId, valuesPrinted: false }, }; - const run = await client(args).post("/api/v1/runs", runBody) as RunRecord; - const commandBody: JsonRecord = { type: "turn", payload: task.payload }; + const runnerDefaults = jsonRecordValue(rendered.dispatchDefaults.runnerJob) ?? {}; + const runnerBody = await sessionRunnerJobBody(args, runnerDefaults); + const sendBody: JsonRecord = { + run: runBody, + payload: task.payload, + createRunnerJob: args.flags.get("no-runner-job") !== true, + runnerJob: runnerBody, + dryRun: args.flags.get("dry-run") === true, + }; const commandIdempotencyKey = optionalFlag(args, "command-idempotency-key") ?? optionalFlag(args, "idempotency-key"); - if (commandIdempotencyKey) commandBody.idempotencyKey = commandIdempotencyKey; - const command = await client(args).post(`/api/v1/runs/${encodeURIComponent(run.id)}/commands`, commandBody) as CommandRecord; - let runnerJob: JsonValue = null; - if (args.flags.get("no-runner-job") !== true) { - const runnerDefaults = jsonRecordValue(rendered.dispatchDefaults.runnerJob) ?? {}; - const runnerOverrides = await optionalRunnerJsonFile(args); - const runnerBody = { ...runnerDefaults, ...runnerOverrides, commandId: command.id } as JsonRecord; - copyOptionalFlag(args, runnerBody, "image"); - copyOptionalFlag(args, runnerBody, "namespace"); - copyOptionalFlag(args, runnerBody, "attempt-id", "attemptId"); - copyOptionalFlag(args, runnerBody, "runner-id", "runnerId"); - copyOptionalFlag(args, runnerBody, "source-commit", "sourceCommit"); - copyRunnerManagerUrlFlag(args, runnerBody); - copyOptionalFlag(args, runnerBody, "service-account-name", "serviceAccountName"); - runnerJob = await client(args).post(`/api/v1/runs/${encodeURIComponent(run.id)}/runner-jobs`, runnerBody); - } - return { action: "session-turn", aipod, sessionId, profile, run, command, runnerJob, valuesPrinted: false, pollCommands: { ps: `./scripts/agentrun sessions ps --reader-id cli --profile ${profile}`, show: `./scripts/agentrun sessions show ${sessionId} --reader-id cli`, trace: `./scripts/agentrun sessions trace ${sessionId} --after-seq 0 --limit 100`, output: `./scripts/agentrun sessions output ${sessionId} --after-seq 0 --limit 100`, read: `./scripts/agentrun sessions read ${sessionId} --reader-id cli`, steer: `./scripts/agentrun sessions steer ${sessionId} --prompt-file `, cancel: `./scripts/agentrun sessions cancel ${sessionId}` } }; + if (commandIdempotencyKey) sendBody.commandIdempotencyKey = commandIdempotencyKey; + if (args.flags.get("dry-run") !== true) await ensureSessionForSend(args, sessionId, String(task.tenantId), String(task.projectId), profile); + const result = await client(args).post(`/api/v1/sessions/${encodeURIComponent(sessionId)}/send`, sendBody); + if (wantsExpandedOutput(args)) return { action: "session-send", compatibilityAlias: options.compatibilityAlias ?? null, aipod, result: result as JsonValue, valuesPrinted: false }; + return summarizeSessionSendResult(result, sessionId, options.compatibilityAlias ?? null, profile, aipod); } async function submitQueueTask(args: ParsedArgs): Promise { @@ -2011,8 +2039,7 @@ function help(args: ParsedArgs, group?: string): JsonRecord { "sessions storage ", "sessions storage --delete", "sessions show [--reader-id ]", - "sessions turn [sessionId] [--aipod |--json-stdin|--json-file ] [--prompt-stdin|--prompt-file |--prompt ] [--profile codex|deepseek|minimax-m3|dsflash-go||M3] [--runner-json-stdin|--runner-json-file ] [--no-runner-job]", - "sessions steer [--prompt-stdin|--prompt-file |--prompt ]", + "sessions send [sessionId] [--aipod |--json-stdin|--json-file ] [--prompt-stdin|--prompt-file |--prompt ] [--profile codex|deepseek|minimax-m3|dsflash-go||M3] [--runner-json-stdin|--runner-json-file ] [--no-runner-job] [--dry-run]", "sessions cancel [--reason ] [--full|--raw]", "sessions trace [--after-seq ] [--limit ] [--run-id ] [--summary-chars ] [--include-output] [--seq |--event-id |--item-id ] [--detail-scan-pages ] [--full|--raw]", "sessions output [--after-seq ] [--limit ] [--run-id ] [--summary-chars ] [--include-output] [--seq |--event-id |--item-id ] [--detail-scan-pages ] [--full|--raw]", diff --git a/src/mgr/result.ts b/src/mgr/result.ts index 7320ef8..3fee284 100644 --- a/src/mgr/result.ts +++ b/src/mgr/result.ts @@ -56,8 +56,11 @@ export async function buildRunResult(store: AgentRunStore, runId: string, comman const failureMessage = resultFailureMessage(run, command, scopedEvents, terminal); const failureDetails = resultFailureDetails(scopedEvents, terminal); const reply = assistantReply(scopedEvents); + const responseAuthority = finalResponseAuthority(reply); + const needsContinuation = terminal === "completed" && responseAuthority !== "authoritative"; + const completionEvidence = completionEvidenceSummary({ terminal, terminalSource, reply, responseAuthority, needsContinuation, sessionId: run.sessionRef?.sessionId ?? null }); const blocker = terminal === "blocked" || terminal === "failed" ? { failureKind, message: failureMessage, details: failureDetails } : null; - const liveness = livenessSnapshot(run, command, events, scopedEvents, terminal, failureKind, failureMessage); + const liveness = livenessSnapshot(run, command, events, scopedEvents, terminal, failureKind, failureMessage, { responseAuthority, needsContinuation }); const terminalClassification = terminalClassificationSummary({ terminal, terminalSource, failureKind, failureMessage, liveness }); const diagnosis = runDiagnosis({ run, command, latestJob, events, terminalClassification, liveness, terminalStatus: terminal, failureKind, failureMessage }); const steerDelivery = command?.type === "steer" ? steerDeliverySummary(events, command.id) : null; @@ -74,6 +77,10 @@ export async function buildRunResult(store: AgentRunStore, runId: string, comman terminalStatus: terminal, terminalSource, completed: terminal === "completed", + finalResponseAuthority: responseAuthority, + finalResponseFallback: responseAuthority === "fallback", + needsContinuation, + completionEvidence, reply: reply.text, finalResponse: { text: reply.text, @@ -81,6 +88,9 @@ export async function buildRunResult(store: AgentRunStore, runId: string, comman source: reply.source, final: reply.final, replyAuthority: reply.replyAuthority, + authority: responseAuthority, + fallback: responseAuthority === "fallback", + needsContinuation, textTruncated: reply.textTruncated, outputTruncated: reply.outputTruncated, }, @@ -110,7 +120,7 @@ export async function buildRunResult(store: AgentRunStore, runId: string, comman }; } -function livenessSnapshot(run: RunRecord, command: CommandRecord | null, events: RunEvent[], scopedEvents: RunEvent[], terminal: TerminalStatus | null, failureKind: FailureKind | null, failureMessage: string | null): JsonRecord { +function livenessSnapshot(run: RunRecord, command: CommandRecord | null, events: RunEvent[], scopedEvents: RunEvent[], terminal: TerminalStatus | null, failureKind: FailureKind | null, failureMessage: string | null, completion: { responseAuthority: string; needsContinuation: boolean }): JsonRecord { const nowMs = Date.now(); const active = terminal === null && !runIsTerminal(run) && !commandIsTerminal(command); const lastEvent = events.at(-1) ?? null; @@ -143,15 +153,16 @@ function livenessSnapshot(run: RunRecord, command: CommandRecord | null, events: lease, transportDisconnect: transportDisconnect ? livenessActivitySummary(transportDisconnect, nowMs) : null, retryInterruption: retryInterruption ? livenessActivitySummary(retryInterruption, nowMs) : null, - recoveryActions: recoveryActions({ run, command, afterSeq, active, terminal, failureKind, failureMessage }), + recoveryActions: recoveryActions({ run, command, afterSeq, active, terminal, failureKind, failureMessage, needsContinuation: completion.needsContinuation, finalResponseAuthority: completion.responseAuthority }), valuesPrinted: false, }; } function livenessPhase(input: { active: boolean; command: CommandRecord | null; lastVisibleActivity: RunEvent | null; leaseExpired: boolean | null; transportDisconnect: RunEvent | null; timeoutBudget: JsonRecord; lastActivity: JsonRecord | null }): string { if (!input.active) return "terminal"; - if (input.command?.state === "pending") return "waiting-runner"; + if (input.command?.state === "pending" && input.leaseExpired === true) return "waiting-runner-stale-lease"; if (input.leaseExpired === true) return "runner-heartbeat-stale"; + if (input.command?.state === "pending") return "waiting-runner"; if (input.transportDisconnect) return "transport-disconnected"; if (input.lastVisibleActivity?.type === "tool_call") { const status = input.lastVisibleActivity.payload.status; @@ -428,8 +439,8 @@ function ageMs(value: string, nowMs: number): number | null { return Number.isFinite(parsed) ? Math.max(0, nowMs - parsed) : null; } -function recoveryActions(input: { run: RunRecord; command: CommandRecord | null; afterSeq: number; active: boolean; terminal: TerminalStatus | null; failureKind: FailureKind | null; failureMessage: string | null }): JsonRecord[] { - const { run, command, afterSeq, active, terminal, failureKind, failureMessage } = input; +function recoveryActions(input: { run: RunRecord; command: CommandRecord | null; afterSeq: number; active: boolean; terminal: TerminalStatus | null; failureKind: FailureKind | null; failureMessage: string | null; needsContinuation: boolean; finalResponseAuthority: string }): JsonRecord[] { + const { run, command, afterSeq, active, terminal, failureKind, failureMessage, needsContinuation, finalResponseAuthority } = input; const sessionId = run.sessionRef?.sessionId ?? null; const traceCommand = sessionId ? `./scripts/agentrun sessions trace ${sessionId} --after-seq ${afterSeq} --limit 100 --run-id ${run.id}` : `./scripts/agentrun runs events ${run.id} --after-seq ${afterSeq} --limit 100 --summary`; const outputCommand = sessionId ? `./scripts/agentrun sessions output ${sessionId} --after-seq ${afterSeq} --limit 100 --run-id ${run.id}` : null; @@ -438,20 +449,55 @@ function recoveryActions(input: { run: RunRecord; command: CommandRecord | null; ]; if (outputCommand) actions.push({ action: "poll-output", runId: run.id, commandId: command?.id ?? null, afterSeq, command: outputCommand, valuesPrinted: false }); if (active) { - if (sessionId) actions.push({ action: "steer-session", sessionId, runId: run.id, commandId: command?.id ?? null, command: `./scripts/agentrun sessions steer ${sessionId} --prompt-stdin`, valuesPrinted: false }); + if (sessionId) actions.push({ action: "send-session", sessionId, runId: run.id, commandId: command?.id ?? null, command: `./scripts/agentrun sessions send ${sessionId} --prompt-stdin`, hint: "manager 会按当前 session 状态自动决定内部 steer 或新 turn", valuesPrinted: false }); if (command) actions.push({ action: "cancel-command", runId: run.id, commandId: command.id, command: `./scripts/agentrun commands cancel ${command.id} --reason `, valuesPrinted: false }); else actions.push({ action: "cancel-run", runId: run.id, command: `./scripts/agentrun runs cancel ${run.id} --reason `, valuesPrinted: false }); return actions; } + if (needsContinuation && sessionId) { + if (command) actions.push({ action: "inspect-result", runId: run.id, commandId: command.id, command: `./scripts/agentrun commands result ${command.id} --run-id ${run.id}`, valuesPrinted: false }); + actions.push({ action: "continue-session", reason: `final-response-${finalResponseAuthority}`, sessionId, command: `./scripts/agentrun sessions send ${sessionId} --prompt-stdin`, hint: "命令已 terminal completed,但没有 authoritative final response;管理者应先读 trace/output,再用同一 session 发送后续 prompt。", valuesPrinted: false }); + return actions; + } if (terminal === "failed" || terminal === "blocked" || terminal === "cancelled") { if (command) actions.push({ action: "inspect-result", runId: run.id, commandId: command.id, command: `./scripts/agentrun commands result ${command.id} --run-id ${run.id}`, valuesPrinted: false }); - if (sessionId) actions.push({ action: "resume-session", sessionId, command: `./scripts/agentrun sessions turn ${sessionId} --prompt-stdin`, valuesPrinted: false }); + if (sessionId) actions.push({ action: "continue-session", sessionId, command: `./scripts/agentrun sessions send ${sessionId} --prompt-stdin`, valuesPrinted: false }); if (failureKind === "backend-timeout") actions.push({ action: "split-task", reason: "backend-timeout", hint: "先由管理者读取 trace/result,总结下一步,再把后续 prompt 发到同一 session;必要时把大 patch / 长工具链拆成更短 turn。", failureMessage: failureMessage ? boundedTextSummary(failureMessage, { limitChars: 200 }).text as string : null, valuesPrinted: false }); - else actions.push({ action: "retry-or-split", reason: failureKind ?? "terminal", hint: "先读 trace/output 的 detail id,再决定 steer、重跑或拆分", valuesPrinted: false }); + else actions.push({ action: "retry-or-split", reason: failureKind ?? "terminal", hint: "先读 trace/output 的 detail id,再决定继续同 session、重跑或拆分", valuesPrinted: false }); } return actions; } +function finalResponseAuthority(reply: AssistantReplySummary): "authoritative" | "fallback" | "missing" { + if (reply.replyAuthority || reply.final) return "authoritative"; + return reply.text.length > 0 ? "fallback" : "missing"; +} + +function completionEvidenceSummary(input: { terminal: TerminalStatus | null; terminalSource: string; reply: AssistantReplySummary; responseAuthority: string; needsContinuation: boolean; sessionId: string | null }): JsonRecord { + const recommendedAction = input.needsContinuation && input.sessionId ? `./scripts/agentrun sessions send ${input.sessionId} --prompt-stdin` : null; + return { + terminalStatus: input.terminal, + terminalSource: input.terminalSource, + finalResponseAuthority: input.responseAuthority, + finalResponseSeq: input.reply.seq, + finalResponseSource: input.reply.source, + finalResponseFinal: input.reply.final, + finalResponseReplyAuthority: input.reply.replyAuthority, + finalResponseFallback: input.responseAuthority === "fallback", + needsContinuation: input.needsContinuation, + reason: completionEvidenceReason(input.responseAuthority, input.terminal), + recommendedAction, + valuesPrinted: false, + }; +} + +function completionEvidenceReason(responseAuthority: string, terminal: TerminalStatus | null): string { + if (terminal !== "completed") return "command is not completed"; + if (responseAuthority === "authoritative") return "terminal completed with authoritative assistant final response"; + if (responseAuthority === "fallback") return "terminal completed but only a non-authoritative assistant progress/output fallback was available"; + return "terminal completed but no assistant response text was available"; +} + function numberJsonValue(value: JsonValue | undefined): number | null { return typeof value === "number" && Number.isFinite(value) ? value : null; } diff --git a/src/mgr/server.ts b/src/mgr/server.ts index ad48694..595de59 100644 --- a/src/mgr/server.ts +++ b/src/mgr/server.ts @@ -6,8 +6,8 @@ import { openAgentRunStoreFromEnv } from "./store.js"; import { AgentRunError, errorToJson } from "../common/errors.js"; import { asRecord, validateBackendProfile, validateCreateCommand, validateCreateQueueTask, validateCreateRun, validateQueueTaskState, validateSessionListState } from "../common/validation.js"; import { isBackendProfile } from "../common/backend-profiles.js"; -import type { ApiErrorBody, ApiOkBody, JsonRecord, JsonValue, QueueTaskRecord, RunEvent } from "../common/types.js"; -import { createKubernetesRunnerJob } from "./kubernetes-runner-job.js"; +import type { ApiErrorBody, ApiOkBody, CommandRecord, JsonRecord, JsonValue, QueueTaskRecord, RunEvent, RunRecord, SessionRecord } from "../common/types.js"; +import { createKubernetesRunnerJob, type RunnerJobDefaults } from "./kubernetes-runner-job.js"; import { dispatchQueueTask, refreshQueueTaskFromCore } from "./queue-dispatch.js"; import { buildRunResult } from "./result.js"; import { runnerJobStatusSummary } from "./runner-job-status.js"; @@ -35,6 +35,21 @@ function sessionPvcOptionsForRequest(serverDefaults: { kubectlHandler?: import(" return pvcOptions(runnerJobDefaults); } +function runnerJobDefaultsForRequest(defaults: ManagerServerOptions["runnerJobDefaults"], sourceCommit: string): RunnerJobDefaults { + const namespace = defaults?.namespace ?? process.env.AGENTRUN_RUNTIME_NAMESPACE ?? "agentrun-v01"; + return { + namespace, + managerUrl: defaults?.managerUrl ?? process.env.AGENTRUN_INTERNAL_MGR_URL ?? `http://agentrun-mgr.${namespace}.svc.cluster.local:8080`, + image: defaults?.image ?? process.env.AGENTRUN_RUNNER_IMAGE ?? "", + sourceCommit, + ...optionalStringRecord("envIdentity", defaults?.envIdentity ?? process.env.AGENTRUN_ENV_IDENTITY), + ...optionalStringRecord("artifactCatalogFile", defaults?.artifactCatalogFile ?? process.env.AGENTRUN_ARTIFACT_CATALOG_FILE), + serviceAccountName: defaults?.serviceAccountName ?? process.env.AGENTRUN_RUNNER_SERVICE_ACCOUNT ?? "agentrun-v01-runner", + ...(defaults?.kubectlCommand ? { kubectlCommand: defaults.kubectlCommand } : {}), + ...(defaults?.unideskSshEndpointEnv ? { unideskSshEndpointEnv: defaults.unideskSshEndpointEnv } : {}), + }; +} + export interface ManagerServerOptions { store?: AgentRunStore; port?: number; @@ -341,6 +356,10 @@ async function route({ method, url, body, store, sourceCommit, authSummary, runn if (runId) input.runId = runId; return await store.listSessionOutput(sessionOutputMatch[1] ?? "", input) as unknown as JsonValue; } + const sessionSendMatch = path.match(/^\/api\/v1\/sessions\/([^/]+)\/send$/u); + if (method === "POST" && sessionSendMatch) { + return await sendToSession({ store, sessionId: sessionSendMatch[1] ?? "", body, sourceCommit, runnerJobDefaults }) as JsonValue; + } const sessionReadMatch = path.match(/^\/api\/v1\/sessions\/([^/]+)\/read$/u); if (method === "POST" && sessionReadMatch) { const record = body === null ? {} : asRecord(body, "read"); @@ -606,6 +625,144 @@ async function applyNamedAipodSpec(name: string, body: unknown, dir?: string): P return await applyAipodSpec(spec, dir) as JsonValue; } +async function sendToSession(input: { store: AgentRunStore; sessionId: string; body: unknown; sourceCommit: string; runnerJobDefaults?: ManagerServerOptions["runnerJobDefaults"] }): Promise { + const record = asRecord(input.body ?? {}, "sessionSend"); + const dryRun = record.dryRun === true; + const existing = await input.store.getSession(input.sessionId); + const active = existing ? await activeReceivableCommand(input.store, existing) : null; + const payload = sessionSendPayload(record); + const commandIdempotencyKey = optionalString(record.commandIdempotencyKey) ?? optionalString(record.idempotencyKey); + if (active) { + const commandBody: JsonRecord = { type: "steer", payload }; + if (commandIdempotencyKey) commandBody.idempotencyKey = commandIdempotencyKey; + const request = { method: "POST", path: `/api/v1/runs/${active.run.id}/commands`, commandType: "steer", payloadBytes: jsonByteLength(payload), valuesPrinted: false }; + if (dryRun) return sessionSendPlan(input.sessionId, "steer", active, request, null); + const command = await input.store.createCommand(active.run.id, validateCreateCommand(commandBody)); + return sessionSendResponse({ sessionId: input.sessionId, decision: "steer", run: active.run, command, runnerJob: null, activeBefore: active, dryRun: false }); + } + + const runRecord = asRecord(record.run ?? record.runBase ?? null, "sessionSend.run"); + const runBody = sessionSendRunBody(input.sessionId, runRecord); + const commandBody: JsonRecord = { type: "turn", payload }; + if (commandIdempotencyKey) commandBody.idempotencyKey = commandIdempotencyKey; + const runnerJobBody = record.runnerJob === undefined || record.runnerJob === null ? {} : asRecord(record.runnerJob, "sessionSend.runnerJob"); + const createRunnerJob = record.createRunnerJob !== false; + const request = { + method: "POST", + path: `/api/v1/sessions/${input.sessionId}/send`, + commandType: "turn", + runBytes: jsonByteLength(runBody), + payloadBytes: jsonByteLength(payload), + createRunnerJob, + runnerJobBytes: createRunnerJob ? jsonByteLength(runnerJobBody) : 0, + valuesPrinted: false, + }; + if (dryRun) return sessionSendPlan(input.sessionId, "turn", active, request, runBody); + const run = await input.store.createRun(validateCreateRun(runBody)); + const command = await input.store.createCommand(run.id, validateCreateCommand(commandBody)); + let runnerJob: JsonValue = null; + if (createRunnerJob) { + runnerJob = await createKubernetesRunnerJob({ + store: input.store, + runId: run.id, + input: { ...runnerJobBody, commandId: command.id } as never, + defaults: runnerJobDefaultsForRequest(input.runnerJobDefaults, input.sourceCommit), + }) as unknown as JsonValue; + } + return sessionSendResponse({ sessionId: input.sessionId, decision: "turn", run, command, runnerJob, activeBefore: active, dryRun: false }); +} + +async function activeReceivableCommand(store: AgentRunStore, session: SessionRecord): Promise<{ run: RunRecord; command: CommandRecord; reason: string; leaseExpired: boolean } | null> { + if (!session.activeRunId || !session.activeCommandId) return null; + const [run, command] = await Promise.all([store.getRun(session.activeRunId), store.getCommand(session.activeCommandId)]); + if (run.sessionRef?.sessionId !== session.sessionId || command.runId !== run.id) return null; + if (runIsTerminal(run) || commandIsTerminal(command)) return null; + if (command.type !== "turn") return null; + const leaseExpired = run.leaseExpiresAt ? Date.parse(run.leaseExpiresAt) <= Date.now() : false; + if (leaseExpired) return null; + if (command.state !== "acknowledged") return null; + if (run.status !== "claimed" && run.status !== "running") return null; + return { run, command, reason: "active-turn-running", leaseExpired }; +} + +function sessionSendPayload(record: JsonRecord): JsonRecord { + const payload = asJsonRecord(record.payload); + if (payload) return payload; + const prompt = optionalString(record.prompt) ?? optionalString(record.message) ?? optionalString(record.text); + if (!prompt) throw new AgentRunError("schema-invalid", "sessions send requires payload or non-empty prompt/message/text", { httpStatus: 400 }); + return { prompt }; +} + +function sessionSendRunBody(sessionId: string, runRecord: JsonRecord): JsonRecord { + const sessionRef = asJsonRecord(runRecord.sessionRef) ?? {}; + const metadata = asJsonRecord(sessionRef.metadata) ?? {}; + return { ...runRecord, sessionRef: { ...sessionRef, sessionId, metadata } }; +} + +function sessionSendPlan(sessionId: string, decision: "steer" | "turn", active: Awaited>, request: JsonRecord, runBody: JsonRecord | null): JsonRecord { + return { + action: "session-send-plan", + dryRun: true, + mutation: false, + sessionId, + decision, + internalCommandType: decision, + activeBefore: active ? activeBeforeSummary(active) : null, + request, + ...(runBody ? { run: { bodyBytes: jsonByteLength(runBody), sessionRef: summarizeSendSessionRef(runBody), valuesPrinted: false } } : {}), + next: { confirm: `./scripts/agentrun sessions send ${sessionId} --prompt-stdin`, note: "Remove --dry-run to perform the mutation. Manager will decide internal steer vs turn from durable session state." }, + valuesPrinted: false, + }; +} + +function sessionSendResponse(input: { sessionId: string; decision: "steer" | "turn"; run: RunRecord; command: CommandRecord; runnerJob: JsonValue; activeBefore: Awaited>; dryRun: false }): JsonRecord { + return { + action: "session-send", + dryRun: input.dryRun, + mutation: true, + sessionId: input.sessionId, + decision: input.decision, + internalCommandType: input.command.type, + run: input.run as unknown as JsonRecord, + command: input.command as unknown as JsonRecord, + runnerJob: input.runnerJob, + activeBefore: input.activeBefore ? activeBeforeSummary(input.activeBefore) : null, + pollCommands: { + show: `./scripts/agentrun sessions show ${input.sessionId} --reader-id cli`, + trace: `./scripts/agentrun sessions trace ${input.sessionId} --after-seq 0 --limit 100`, + output: `./scripts/agentrun sessions output ${input.sessionId} --after-seq 0 --limit 100`, + read: `./scripts/agentrun sessions read ${input.sessionId} --reader-id cli`, + cancel: `./scripts/agentrun sessions cancel ${input.sessionId}`, + }, + valuesPrinted: false, + }; +} + +function activeBeforeSummary(active: NonNullable>>): JsonRecord { + return { runId: active.run.id, commandId: active.command.id, commandState: active.command.state, runStatus: active.run.status, leaseExpiresAt: active.run.leaseExpiresAt, leaseExpired: active.leaseExpired, reason: active.reason, valuesPrinted: false }; +} + +function summarizeSendSessionRef(runBody: JsonRecord): JsonRecord { + const ref = asJsonRecord(runBody.sessionRef) ?? {}; + return { sessionId: optionalString(ref.sessionId), conversationId: optionalString(ref.conversationId), threadId: optionalString(ref.threadId), metadataKeys: Object.keys(asJsonRecord(ref.metadata) ?? {}).sort(), valuesPrinted: false }; +} + +function runIsTerminal(run: RunRecord): boolean { + return run.status === "completed" || run.status === "failed" || run.status === "blocked" || run.status === "cancelled"; +} + +function commandIsTerminal(command: CommandRecord): boolean { + return command.state === "completed" || command.state === "failed" || command.state === "cancelled"; +} + +function optionalString(value: JsonValue | undefined): string | null { + return typeof value === "string" && value.trim().length > 0 ? value.trim() : null; +} + +function jsonByteLength(value: unknown): number { + return Buffer.byteLength(JSON.stringify(value ?? null), "utf8"); +} + function integerQuery(url: URL, key: string, fallback: number): number { const value = Number(url.searchParams.get(key)); return Number.isInteger(value) && value >= 0 ? value : fallback;