From b761ef671367219a52270a7aca520a98abe75391 Mon Sep 17 00:00:00 2001 From: Codex Date: Wed, 3 Jun 2026 11:27:55 +0800 Subject: [PATCH] feat: add session subagent cli control --- docs/reference/spec-v01-agentrun-mgr.md | 9 + docs/reference/spec-v01-cli.md | 19 +- docs/reference/spec-v01-postgres.md | 7 +- docs/reference/spec-v01-queue.md | 20 +- scripts/src/cli.ts | 181 ++++++++++++- src/common/types.ts | 46 ++++ src/common/validation.ts | 12 +- src/mgr/postgres-store.ts | 273 +++++++++++++++++++- src/mgr/server.ts | 52 +++- src/mgr/store.ts | 178 ++++++++++++- src/selftest/cases/00-redaction-postgres.ts | 2 +- src/selftest/cases/65-session-control.ts | 68 +++++ 12 files changed, 833 insertions(+), 34 deletions(-) create mode 100644 src/selftest/cases/65-session-control.ts diff --git a/docs/reference/spec-v01-agentrun-mgr.md b/docs/reference/spec-v01-agentrun-mgr.md index 4257cb3..4645487 100644 --- a/docs/reference/spec-v01-agentrun-mgr.md +++ b/docs/reference/spec-v01-agentrun-mgr.md @@ -43,9 +43,17 @@ POST /api/v1/runs/:runId/runner-jobs GET /api/v1/runs/:runId/runner-jobs?commandId= GET /api/v1/runs/:runId/runner-jobs/:runnerJobId POST /api/v1/commands/:commandId/cancel +GET /api/v1/sessions?state=default&readerId=cli&backendProfile=&cursor=&limit=50 +GET /api/v1/sessions/:sessionId?readerId=cli +GET /api/v1/sessions/:sessionId/trace?afterSeq=0&limit=100&runId= +GET /api/v1/sessions/:sessionId/output?afterSeq=0&limit=100&runId= +POST /api/v1/sessions/:sessionId/read +POST /api/v1/sessions/:sessionId/control GET /api/v1/backends ``` +Session API 是异步 subagent 的轻量控制面。`state=default` 必须只返回 running 和 unread session;`state=all` 才返回历史 read session。command/run 进入 terminal 后,所属 session 的 projection 必须进入 `executionState=terminal` 并 bump version,使未读 reader 在 `ps/default` 中看到它;`POST /read` 写入 reader cursor 后,该 session 不再出现在该 reader 的默认列表中。`trace/output` 只分页读取所属 run 的 events,不代理 Queue summary;`control action=cancel` 取消 active command 或 active run。 + 面向 HWLAB v0.2 canary 的手动调度 API 目标见 [spec-v01-hwlab-manual-dispatch.md](spec-v01-hwlab-manual-dispatch.md)。`runner-jobs` 只显式启动当前 run/command 的 runner Job,不扫描 pending queue,不等待完整模型 turn;自动 scheduler 仍是 deferred 能力。后续 durable cancel API 必须与同一 run/command 状态机衔接,不能让 HWLAB 直接删除 Kubernetes Job 作为正式取消语义。 Runner 私有 API 的 `v0.1` 范围: @@ -163,6 +171,7 @@ Manager 只承接 HWLAB v0.2 Code Agent 的通用执行事实,不承接 HWLAB | Manager REST API | 已实现/已通过主闭环 | 已有 run、command、event、backends、runner register、claim、lease heartbeat、poll、ack、status、runner Job 创建和 health/readiness 的 HTTP JSON API;真实 runtime 已通过 RESTful API 主闭环。 | | 手动 runner Job API | 已实现 | `POST /api/v1/runs/:runId/runner-jobs` 已可创建 Kubernetes runner Job,并固化 idempotency、持久 runner job record、响应 schema 和 cancel 前置检查。 | | runner Job 状态查询 | 已实现 | `GET /api/v1/runs/:runId/runner-jobs` 和 `GET /api/v1/runs/:runId/runner-jobs/:runnerJobId` 返回 attempt/job/log/phase/terminal 摘要,业务客户端无需直连 Kubernetes 做最小定位。 | +| Session 控制面 API | 已实现/Q3 | 已提供 `list/show/trace/output/read/control(cancel)`;session projection 保存 running/terminal、active run/command、last event seq 和 read cursor,用于 CLI `ps/unread`。 | | command/run terminal 分离 | 已实现最小闭环 | `PATCH /api/v1/commands/:commandId/status` 终结 command 并更新 SessionRef;普通 turn completed 不终结 run,run status 仅由 run cancel 或 runner 级不可恢复失败终结。 | | Tenant policy boundary | 已实现最小边界 | v0.1 已做 schema、tenant/backend allowlist、executionPolicy 和 secretScope 结构校验;业务授权仍由 UniDesk/HWLAB 自己判定。 | | `deepseek` backendProfile allowlist | 已实现/已通过主闭环 | Manager validation、backend capability 和 matching SecretRef 校验已支持 `deepseek`;真实 runtime 已经通过 CI/CD 发布并确认 Postgres migration `002_v01_backend_profiles` 应用。 | diff --git a/docs/reference/spec-v01-cli.md b/docs/reference/spec-v01-cli.md index 0f2d6ec..f1e2826 100644 --- a/docs/reference/spec-v01-cli.md +++ b/docs/reference/spec-v01-cli.md @@ -63,10 +63,14 @@ CLI 官方 TypeScript 入口固定为 `scripts/agentrun-cli.ts`。在 G14 非交 ./scripts/agentrun queue cancel [--reason ] ./scripts/agentrun queue dispatch [--json-file ] ./scripts/agentrun queue refresh -./scripts/agentrun sessions show -./scripts/agentrun sessions output [--cursor ] [--limit ] -./scripts/agentrun sessions trace [--cursor ] [--limit ] -./scripts/agentrun sessions control --json-file +./scripts/agentrun sessions ps [--state default|running|unread|terminal|idle|all] [--profile codex|deepseek|minimax-m3|M3] [--reader-id ] +./scripts/agentrun sessions show [--reader-id ] +./scripts/agentrun sessions turn [sessionId] --json-file --prompt-file [--profile codex|deepseek|minimax-m3|M3] [--runner-json-file ] [--no-runner-job] +./scripts/agentrun sessions steer --prompt-file +./scripts/agentrun sessions cancel [--reason ] +./scripts/agentrun sessions trace [--after-seq ] [--limit ] [--run-id ] +./scripts/agentrun sessions output [--after-seq ] [--limit ] [--run-id ] +./scripts/agentrun sessions read [--reader-id ] ``` 具体参数可以在实现时按代码结构微调,但行为必须保持: @@ -85,6 +89,9 @@ CLI 官方 TypeScript 入口固定为 `scripts/agentrun-cli.ts`。在 G14 非交 - `queue dispatch` 是 Q2 的受控手动调度入口,只对单个 task 显式创建 attempt 和 Core run/command/runner job;不得伪装成自动 scheduler。 - `queue refresh` 只根据 Queue task 中保存的 Core run/command 引用回写 Queue attempt 状态,不读取 Core trace 反推 commander 或统计。 - `queue show` 必须返回 task/attempt summary、state、read cursor、stats 相关字段和 `sessionPath`;不得返回或代理完整 output/trace。 +- `sessions ps` 默认只显示 running 和 unread session;`--state all` 才显示历史 read session,避免旧 session 噪声淹没当前进度。 +- `sessions turn` 是异步 subagent 的受控 CLI 入口:短返回 run、command、runnerJob 和后续 poll/read/steer/cancel 命令,不等待模型完成。`--profile M3` 是 `minimax-m3` 的 CLI alias;profile 仍写入 canonical `backendProfile`,不得 fallback。 +- `sessions steer` 对当前 active run 创建 `type=steer` command;`sessions cancel` 通过 Session control 取消 active command 或 run;`sessions read` 写入 reader cursor,使 terminal session 从默认 ps 中消失。 - `sessions output` 与 `sessions trace` 是输出和 trace 的唯一 CLI 查询入口;不得新增 `queue output` 或 `queue trace` 兼容命令。 ## 配置与 Secret 边界 @@ -134,7 +141,7 @@ CLI 官方 TypeScript 入口固定为 `scripts/agentrun-cli.ts`。在 G14 非交 | Queue CLI | 已实现/Q1 | 已提供 `queue submit/list/show/stats/commander/read/cancel`,通过 manager REST 访问 Queue task 和 stats,不直连 Postgres。 | | Queue dispatch/refresh CLI | 已实现/Q2 | `queue dispatch` 受控创建 Core run/command/runner job;`queue refresh` 从 Core run/command 终态回写 Queue task/latestAttempt。 | | 本地 server 生命周期 CLI | 已实现/Q2 hardening | `server start` 默认后台短返回,`server status/stop` 提供 pid、port、logPath 和 readiness 可见性;`--foreground` 保留给容器/显式调试。 | -| Session CLI | 待实现 | 规格见 [spec-v01-queue.md](spec-v01-queue.md);输出和 trace 进入 Session 命令,Queue 命令不得代理 output/trace。 | +| Session CLI | 已实现/Q3 | 已提供 `sessions ps/show/turn/steer/cancel/trace/output/read`;默认 ps 只显示 running/unread,terminal 后自动 unread,read cursor 由 CLI 标记。 | | CLI 测试规格 | 已定义/已验证主闭环 | 综合联调见 [spec-v01-validation.md](spec-v01-validation.md);每次发布仍按手动交互验收复跑。 | | `deepseek` profile CLI | 已实现/已通过主闭环 | `secrets codex render --profile deepseek`、`backends list`、`runner start --backend`、`runner job` 和 JSON 错误可见性已实现;真实 CLI/RESTful 联调已通过 `codex -> deepseek -> codex` 切换主闭环。 | -| `minimax-m3` profile CLI | 已实现/待真实主闭环 | `secrets codex render --profile minimax-m3`、`backends list`、`runner start --backend`、`runner job` 和 JSON 错误可见性已实现;真实 CLI/RESTful 联调需要按 `codex -> deepseek -> minimax-m3 -> codex` 手动验收。 | +| `minimax-m3` profile CLI | 已实现/待真实主闭环 | `secrets codex render --profile minimax-m3`、`backends list`、`runner start --backend`、`runner job`、`sessions turn --profile minimax-m3|M3` 和 JSON 错误可见性已实现;真实 CLI/RESTful 联调需要按 `codex -> deepseek -> minimax-m3 -> codex` 手动验收。 | diff --git a/docs/reference/spec-v01-postgres.md b/docs/reference/spec-v01-postgres.md index 4acc536..643719e 100644 --- a/docs/reference/spec-v01-postgres.md +++ b/docs/reference/spec-v01-postgres.md @@ -35,7 +35,8 @@ Secret 名称和 key 可以在实现时按 Kubernetes 命名限制微调,但 - `agentrun_events`:按 run 和 seq 索引的 append-only event records。 - `agentrun_runners`:registered runner identity、placement、heartbeat 和 capability snapshot。 - `agentrun_runner_jobs`:手动 runner Job 的 idempotency key、payload hash、attempt/job identity 和创建响应。 -- `agentrun_sessions`:SessionRef 到 backend thread/cache identity 的最小映射,不保存 credential 文件或 Secret 值。 +- `agentrun_sessions`:SessionRef 到 backend thread/cache identity、execution projection、active run/command、terminal/unread 水位的映射,不保存 credential 文件或 Secret 值。 +- `agentrun_session_read_cursors`:按 reader 记录 Session 已读 version,用于 CLI 默认只看 running/unread。 - `agentrun_queue_tasks`:AgentRun Queue task identity、queue/lane、tenant/project、priority、state、backendProfile、workspace/resource 引用和 version。 - `agentrun_queue_attempts`:task attempt identity、runId、commandId、runnerJobId、sessionId、state、failureKind、retry index 和 timestamps。 - `agentrun_task_summaries` / `agentrun_attempt_summaries`:Queue 列表、详情和 commander 使用的轻量摘要;不能从 Core trace 反推 overview。 @@ -83,7 +84,7 @@ Secret 名称和 key 可以在实现时按 Kubernetes 命名限制微调,但 | --- | --- | --- | | Postgres durable store 规格 | 已定义 | 本文为 v0.1 存储权威。 | | StatefulSet/Service/PVC | 已实现/已通过主闭环 | `agentrun-v01-postgres` StatefulSet、Service 和 PVC 已由 GitOps runtime 提供,作为 `agentrun-v01` durable store。 | -| migration ledger | 已实现/已通过主闭环 | `agentrun-mgr` 启动 Postgres adapter 时幂等创建 `agentrun_schema_migrations` 并记录 migration id/checksum;当前最新 migration 为 `005_v01_minimax_m3_backend_profile`,用于在不改写既有 migration checksum 的前提下新增 `minimax-m3` backend profile;readiness 必须显示 migration ready。 | -| manager Postgres adapter | 已实现/已通过主闭环 | `agentrun-mgr` 通过 `DATABASE_URL` 启用 Postgres adapter,持久化 runs、commands、events、runners、runner_jobs、sessions、backends、leases、Queue task 和 read cursor;缺少 `DATABASE_URL` 时 live runtime fail fast,memory 只允许显式 self-test/dev。 | +| migration ledger | 已实现/已通过主闭环 | `agentrun-mgr` 启动 Postgres adapter 时幂等创建 `agentrun_schema_migrations` 并记录 migration id/checksum;当前最新 migration 为 `006_v01_session_control`,用于在不改写既有 migration checksum 的前提下新增 session projection/read cursor;readiness 必须显示 migration ready。 | +| manager Postgres adapter | 已实现/已通过主闭环 | `agentrun-mgr` 通过 `DATABASE_URL` 启用 Postgres adapter,持久化 runs、commands、events、runners、runner_jobs、sessions、session read cursors、backends、leases、Queue task 和 read cursor;缺少 `DATABASE_URL` 时 live runtime fail fast,memory 只允许显式 self-test/dev。 | | health/readiness store 状态 | 已实现 | health/readiness 返回 adapter、reachable、migrationReady、migrationId、failureKind 和 redacted Secret 状态,不输出 DSN 明文。 | | file/sqlite durable store | 不采用 | 只可用于临时本地测试,不作为 v0.1 runtime truth。 | diff --git a/docs/reference/spec-v01-queue.md b/docs/reference/spec-v01-queue.md index 27ce541..520a968 100644 --- a/docs/reference/spec-v01-queue.md +++ b/docs/reference/spec-v01-queue.md @@ -44,8 +44,10 @@ Session 公共 API 承接输出、trace 和会话控制: ```http GET /api/v1/sessions/:sessionId -GET /api/v1/sessions/:sessionId/output?cursor=&limit= -GET /api/v1/sessions/:sessionId/trace?cursor=&limit= +GET /api/v1/sessions?state=default&readerId=&backendProfile=&cursor=&limit= +GET /api/v1/sessions/:sessionId/output?afterSeq=&limit= +GET /api/v1/sessions/:sessionId/trace?afterSeq=&limit= +POST /api/v1/sessions/:sessionId/read POST /api/v1/sessions/:sessionId/control ``` @@ -89,10 +91,14 @@ AgentRun CLI 必须提供 Queue 和 Session 两组命令。Queue 命令只操作 Session 命令负责输出、trace 和会话控制: ```bash -./scripts/agentrun sessions show -./scripts/agentrun sessions output [--cursor ] [--limit ] -./scripts/agentrun sessions trace [--cursor ] [--limit ] -./scripts/agentrun sessions control --json-file +./scripts/agentrun sessions ps [--state default|running|unread|terminal|idle|all] [--profile codex|deepseek|minimax-m3|M3] [--reader-id ] +./scripts/agentrun sessions show [--reader-id ] +./scripts/agentrun sessions turn [sessionId] --json-file --prompt-file [--profile codex|deepseek|minimax-m3|M3] +./scripts/agentrun sessions steer --prompt-file +./scripts/agentrun sessions cancel [--reason ] +./scripts/agentrun sessions output [--after-seq ] [--limit ] +./scripts/agentrun sessions trace [--after-seq ] [--limit ] +./scripts/agentrun sessions read [--reader-id ] ``` 不得新增 `queue output`、`queue trace` 或 `queue session/*` 这类子路径代理。`queue show` 最多打印 `sessionPath` 和下一步 `sessions ...` 命令。 @@ -177,7 +183,7 @@ Queue Q2 的真实手动验收必须覆盖以下稳定边界: | Queue RESTful API | 已实现/Q1 | 已通过 `agentrun-mgr` 暴露 `submit/list/show/stats/read/cancel/commander`,使用短请求和 Queue version/cursor 轻量轮询;Q2 再接入 attempt 与真实执行。 | | Queue CLI | 已实现/Q1 | 已加入 `queue submit/list/show/stats/commander/read/cancel`;Queue 命令只返回 task summary、stats、read cursor 和 `sessionPath`。 | | Queue dispatch/refresh | 已实现/Q2 | `queue dispatch` 受控创建 Core run/command/runner job;`queue refresh` 从 Core run/command 终态回写 Queue task/latestAttempt;自动 Scheduler 仍 deferred。 | -| Session API/CLI | 待实现 | Queue 只返回 `sessionPath`;Session 层承接输出、trace 和控制。 | +| Session API/CLI | 已实现/Q3 | Queue 只返回 `sessionPath`;Session 层已承接 `ps/show/turn/steer/cancel/output/trace/read`,默认列表只显示 running/unread。 | | Scheduler 接入 | 待实现 | 旧 Code Queue scheduler 不保留;AgentRun Scheduler 是唯一调度方向。 | | OA/Event/integrations | 不采用 | 首版不做,后续如需外部 connector/sink 必须单独立规格。 | | 历史迁移 | 不采用 | 旧 Code Queue 历史数据不迁移到 AgentRun。 | diff --git a/scripts/src/cli.ts b/scripts/src/cli.ts index e78f678..052cdd0 100644 --- a/scripts/src/cli.ts +++ b/scripts/src/cli.ts @@ -1,4 +1,5 @@ import { mkdir, readFile, rm, writeFile } from "node:fs/promises"; +import { randomUUID } from "node:crypto"; import { spawn } from "node:child_process"; import { closeSync, existsSync, openSync } from "node:fs"; import path from "node:path"; @@ -8,7 +9,7 @@ import { ManagerClient } from "../../src/mgr/client.js"; import { runOnce } from "../../src/runner/run-once.js"; import { renderRunnerJobDryRun } from "../../src/runner/k8s-job.js"; import { renderCodexProviderSecretPlan } from "./secret-render.js"; -import type { JsonRecord, JsonValue, RunRecord } from "../../src/common/types.js"; +import type { BackendProfile, CommandRecord, JsonRecord, JsonValue, RunRecord, SessionSummary } from "../../src/common/types.js"; import { AgentRunError, errorToJson } from "../../src/common/errors.js"; import type { RunnerOnceOptions } from "../../src/runner/run-once.js"; import { isBackendProfile } from "../../src/common/backend-profiles.js"; @@ -38,6 +39,14 @@ async function dispatch(args: ParsedArgs): Promise { if (group === "server" && command === "stop") return stopServer(args); if (group === "backends" && command === "list") return client(args).get("/api/v1/backends"); if (group === "secrets" && command === "codex" && id === "render") return renderCodexSecret(args); + if (group === "sessions" && command === "ps") return listSessions(args); + if (group === "sessions" && command === "show" && id) return client(args).get(`/api/v1/sessions/${encodeURIComponent(id)}${readerQuery(args)}`); + if (group === "sessions" && command === "read" && id) return client(args).post(`/api/v1/sessions/${encodeURIComponent(id)}/read`, { readerId: optionalFlag(args, "reader-id") ?? "cli" }); + if (group === "sessions" && command === "trace" && id) return sessionEvents(args, id, "trace"); + if (group === "sessions" && command === "output" && id) return sessionEvents(args, id, "output"); + if (group === "sessions" && command === "turn") return sessionTurn(args, id ?? null); + if (group === "sessions" && command === "steer" && id) return sessionSteer(args, id); + if (group === "sessions" && command === "cancel" && id) return sessionCancel(args, id); if (group === "queue" && command === "submit") return submitQueueTask(args); if (group === "queue" && command === "list") return listQueueTasks(args); if (group === "queue" && command === "show" && id) return client(args).get(`/api/v1/queue/tasks/${encodeURIComponent(id)}`); @@ -111,6 +120,92 @@ async function listRunnerJobs(args: ParsedArgs): Promise { return client(args).get(`/api/v1/runs/${encodeURIComponent(runId)}/runner-jobs${commandId ? `?commandId=${encodeURIComponent(commandId)}` : ""}`); } +async function listSessions(args: ParsedArgs): Promise { + const params = new URLSearchParams(); + const state = optionalFlag(args, "state") ?? (args.flags.get("running") === true ? "running" : args.flags.get("unread") === true ? "unread" : args.flags.get("all") === true ? "all" : null); + const profile = optionalFlag(args, "profile") ?? optionalFlag(args, "backend-profile"); + const readerId = optionalFlag(args, "reader-id"); + const cursor = optionalFlag(args, "cursor"); + const limit = optionalFlag(args, "limit"); + if (state) params.set("state", state); + if (profile) params.set("profile", normalizeProfile(profile)); + if (readerId) params.set("readerId", readerId); + if (cursor) params.set("cursor", cursor); + if (limit) params.set("limit", limit); + const query = params.toString(); + return client(args).get(`/api/v1/sessions${query ? `?${query}` : ""}`); +} + +async function sessionEvents(args: ParsedArgs, sessionId: string, kind: "trace" | "output"): Promise { + const params = new URLSearchParams(); + const afterSeq = optionalFlag(args, "after-seq"); + const limit = optionalFlag(args, "limit"); + const runId = optionalFlag(args, "run-id"); + if (afterSeq) params.set("afterSeq", afterSeq); + if (limit) params.set("limit", limit); + if (runId) params.set("runId", runId); + const query = params.toString(); + return client(args).get(`/api/v1/sessions/${encodeURIComponent(sessionId)}/${kind}${query ? `?${query}` : ""}`); +} + +async function sessionTurn(args: ParsedArgs, positionalSessionId: string | null): Promise { + const body = await optionalJsonFile(args); + const sessionId = positionalSessionId ?? optionalFlag(args, "session-id") ?? newSessionId(); + const requestedProfile = optionalFlag(args, "profile") ?? optionalFlag(args, "backend-profile") ?? (typeof body.backendProfile === "string" ? String(body.backendProfile) : "codex"); + const profile = normalizeProfile(requestedProfile); + const prompt = await readPrompt(args); + body.tenantId = optionalFlag(args, "tenant-id") ?? stringField(body, "tenantId", "unidesk"); + body.projectId = optionalFlag(args, "project-id") ?? stringField(body, "projectId", "default"); + body.providerId = optionalFlag(args, "provider-id") ?? stringField(body, "providerId", "G14"); + body.backendProfile = profile; + body.workspaceRef = jsonObjectFlag(args, "workspace-json") ?? objectField(body, "workspaceRef", { kind: "opaque", path: "." }); + body.executionPolicy = jsonObjectFlag(args, "execution-policy-json") ?? objectField(body, "executionPolicy", defaultExecutionPolicy(profile)); + body.traceSink = body.traceSink ?? null; + const sessionRef = objectField(body, "sessionRef", {}); + const sessionMetadata = objectField(sessionRef, "metadata", {}); + const title = optionalFlag(args, "title"); + if (title) sessionMetadata.title = title; + body.sessionRef = { ...sessionRef, sessionId, metadata: sessionMetadata }; + const run = await client(args).post("/api/v1/runs", body) as RunRecord; + const commandBody: JsonRecord = { type: "turn", payload: { prompt } }; + const commandIdempotencyKey = optionalFlag(args, "command-idempotency-key") ?? optionalFlag(args, "idempotency-key"); + if (commandIdempotencyKey) commandBody.idempotencyKey = commandIdempotencyKey; + const command = await client(args).post(`/api/v1/runs/${encodeURIComponent(run.id)}/commands`, commandBody) as CommandRecord; + let runnerJob: JsonValue = null; + if (args.flags.get("no-runner-job") !== true) { + const runnerBody = await optionalRunnerJsonFile(args); + runnerBody.commandId = command.id; + copyOptionalFlag(args, runnerBody, "image"); + copyOptionalFlag(args, runnerBody, "namespace"); + copyOptionalFlag(args, runnerBody, "attempt-id", "attemptId"); + copyOptionalFlag(args, runnerBody, "runner-id", "runnerId"); + copyOptionalFlag(args, runnerBody, "source-commit", "sourceCommit"); + copyOptionalFlag(args, runnerBody, "runner-manager-url", "managerUrl"); + copyOptionalFlag(args, runnerBody, "service-account-name", "serviceAccountName"); + const runnerIdempotencyKey = optionalFlag(args, "runner-idempotency-key"); + if (runnerIdempotencyKey) runnerBody.idempotencyKey = runnerIdempotencyKey; + runnerJob = await client(args).post(`/api/v1/runs/${encodeURIComponent(run.id)}/runner-jobs`, runnerBody); + } + return { action: "session-turn", sessionId, profile, run, command, runnerJob, pollCommands: { ps: `./scripts/agentrun sessions ps --reader-id cli --profile ${profile}`, show: `./scripts/agentrun sessions show ${sessionId} --reader-id cli`, trace: `./scripts/agentrun sessions trace ${sessionId} --after-seq 0 --limit 100`, output: `./scripts/agentrun sessions output ${sessionId} --after-seq 0 --limit 100`, read: `./scripts/agentrun sessions read ${sessionId} --reader-id cli`, steer: `./scripts/agentrun sessions steer ${sessionId} --prompt-file `, cancel: `./scripts/agentrun sessions cancel ${sessionId}` } }; +} + +async function sessionSteer(args: ParsedArgs, sessionId: string): Promise { + const session = await client(args).get(`/api/v1/sessions/${encodeURIComponent(sessionId)}${readerQuery(args)}`) as SessionSummary; + const runId = session.activeRunId ?? session.lastRunId; + if (!runId) throw new AgentRunError("schema-invalid", `session ${sessionId} has no run to steer`, { httpStatus: 2 }); + const prompt = await readPrompt(args); + const body: JsonRecord = { type: "steer", payload: { prompt } }; + const idempotencyKey = optionalFlag(args, "idempotency-key"); + if (idempotencyKey) body.idempotencyKey = idempotencyKey; + const command = await client(args).post(`/api/v1/runs/${encodeURIComponent(runId)}/commands`, body); + return { action: "session-steer", sessionId, runId, command }; +} + +async function sessionCancel(args: ParsedArgs, sessionId: string): Promise { + const result = await client(args).post(`/api/v1/sessions/${encodeURIComponent(sessionId)}/control`, { action: "cancel", ...cancelBody(args) }); + return { action: "session-cancel", sessionId, result }; +} + async function submitQueueTask(args: ParsedArgs): Promise { const body = await jsonFile(args); const idempotencyKey = optionalFlag(args, "idempotency-key"); @@ -410,6 +505,82 @@ async function optionalJsonFile(args: ParsedArgs): Promise { return jsonFile(args); } +async function optionalRunnerJsonFile(args: ParsedArgs): Promise { + const file = optionalFlag(args, "runner-json-file"); + if (!file) return {}; + const value = JSON.parse(await readFile(file, "utf8")) as unknown; + if (typeof value === "object" && value !== null && !Array.isArray(value)) return value as JsonRecord; + throw new AgentRunError("schema-invalid", "runner json file must contain an object", { httpStatus: 2 }); +} + +async function readPrompt(args: ParsedArgs): Promise { + const promptFlag = optionalFlag(args, "prompt"); + if (promptFlag) return promptFlag; + const promptFile = optionalFlag(args, "prompt-file"); + if (promptFile) { + const text = await readFile(promptFile, "utf8"); + if (text.trim().length === 0) throw new AgentRunError("schema-invalid", "prompt file is empty", { httpStatus: 2 }); + return text; + } + if (args.flags.get("prompt-stdin") === true) { + const text = await readStdinText(); + if (text.trim().length === 0) throw new AgentRunError("schema-invalid", "stdin prompt is empty", { httpStatus: 2 }); + return text; + } + const inline = args.positional.slice(3).join(" ").trim(); + if (inline.length > 0) return inline; + throw new AgentRunError("schema-invalid", "prompt is required; use --prompt, --prompt-file, --prompt-stdin, or trailing text", { httpStatus: 2 }); +} + +async function readStdinText(): Promise { + const chunks: Buffer[] = []; + for await (const chunk of process.stdin) chunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk)); + return Buffer.concat(chunks).toString("utf8"); +} + +function normalizeProfile(value: string): BackendProfile { + const normalized = value.trim().toLowerCase(); + const profile = normalized === "m3" || normalized === "minimax" || normalized === "minimax_m3" ? "minimax-m3" : normalized; + if (!isBackendProfile(profile)) throw new AgentRunError("schema-invalid", `backend profile ${value} is not supported in v0.1`, { httpStatus: 2 }); + return profile; +} + +function newSessionId(): string { + return `ses_${randomUUID().replace(/-/gu, "")}`; +} + +function stringField(record: JsonRecord, key: string, fallback: string): string { + const value = record[key]; + return typeof value === "string" && value.trim().length > 0 ? value.trim() : fallback; +} + +function objectField(record: JsonRecord, key: string, fallback: JsonRecord): JsonRecord { + const value = record[key]; + return typeof value === "object" && value !== null && !Array.isArray(value) ? value as JsonRecord : fallback; +} + +function jsonObjectFlag(args: ParsedArgs, name: string): JsonRecord | null { + const value = optionalFlag(args, name); + if (!value) return null; + const parsed = JSON.parse(value) as unknown; + if (typeof parsed === "object" && parsed !== null && !Array.isArray(parsed)) return parsed as JsonRecord; + throw new AgentRunError("schema-invalid", `--${name} must be a JSON object`, { httpStatus: 2 }); +} + +function defaultExecutionPolicy(profile: BackendProfile): JsonRecord { + return { sandbox: "workspace-write", approval: "never", timeoutMs: 900000, network: "enabled", secretScope: { allowCredentialEcho: false, providerCredentials: [{ profile, secretRef: { name: `agentrun-v01-provider-${profile}`, keys: ["auth.json", "config.toml"] } }] } }; +} + +function copyOptionalFlag(args: ParsedArgs, target: JsonRecord, flagName: string, key = flagName.replace(/-([a-z])/gu, (_, letter: string) => letter.toUpperCase())): void { + const value = optionalFlag(args, flagName); + if (value) target[key] = value; +} + +function readerQuery(args: ParsedArgs): string { + const readerId = optionalFlag(args, "reader-id"); + return readerId ? `?readerId=${encodeURIComponent(readerId)}` : ""; +} + function parseArgs(argv: string[]): ParsedArgs { const positional: string[] = []; const flags = new Map(); @@ -453,6 +624,14 @@ function help(): JsonRecord { "runs events --after-seq --limit ", "runs result [--command-id ]", "runs cancel [--reason ]", + "sessions ps [--state default|running|unread|terminal|idle|all] [--profile codex|deepseek|minimax-m3|M3] [--reader-id ]", + "sessions show [--reader-id ]", + "sessions turn [sessionId] --json-file --prompt-file [--profile minimax-m3|M3] [--runner-json-file ]", + "sessions steer --prompt-file ", + "sessions cancel [--reason ]", + "sessions trace [--after-seq ] [--limit ] [--run-id ]", + "sessions output [--after-seq ] [--limit ] [--run-id ]", + "sessions read [--reader-id ]", "commands create --type turn|steer|interrupt --json-file ", "commands show --run-id ", "commands result --run-id ", diff --git a/src/common/types.ts b/src/common/types.ts index 288e465..4335eb1 100644 --- a/src/common/types.ts +++ b/src/common/types.ts @@ -29,6 +29,9 @@ export type CommandState = "pending" | "acknowledged" | "completed" | "failed" | export type TerminalStatus = "completed" | "failed" | "blocked" | "cancelled"; export type BackendProfile = "codex" | "deepseek" | "minimax-m3"; export type QueueTaskState = "pending" | "running" | "completed" | "failed" | "blocked" | "cancelled"; +export type SessionExecutionState = "idle" | "running" | "terminal"; +export type SessionAttentionState = "active" | "unread" | "read"; +export type SessionListState = "default" | "running" | "unread" | "terminal" | "idle" | "all"; export interface WorkspaceRef extends JsonRecord { kind: "git-worktree" | "host-path" | "kubernetes-pvc" | "opaque"; @@ -179,11 +182,54 @@ export interface SessionRecord extends JsonRecord { conversationId: string | null; threadId: string | null; metadata: JsonRecord; + version: number; + executionState: SessionExecutionState; + lastRunId: string | null; + lastCommandId: string | null; + activeRunId: string | null; + activeCommandId: string | null; + lastEventSeq: number; + terminalStatus: TerminalStatus | null; + failureKind: FailureKind | null; + title: string | null; + summary: JsonRecord; + lastActivityAt: string | null; createdAt: string; updatedAt: string; expiresAt: string | null; } +export interface SessionReadCursorRecord extends JsonRecord { + sessionId: string; + readerId: string; + sessionVersion: number; + readAt: string; +} + +export interface SessionSummary extends SessionRecord { + sessionPath: string; + readerId: string | null; + readCursor: SessionReadCursorRecord | null; + attentionState: SessionAttentionState; + unread: boolean; + active: boolean; +} + +export interface SessionListResult extends JsonRecord { + items: SessionSummary[]; + count: number; + cursor: string | null; + filters: JsonRecord; +} + +export interface SessionEventPage extends JsonRecord { + sessionId: string; + runId: string | null; + items: RunEvent[]; + count: number; + cursor: string | null; +} + export interface RunnerJobRecord extends JsonRecord { id: string; runId: string; diff --git a/src/common/validation.ts b/src/common/validation.ts index fdcd698..3e058a7 100644 --- a/src/common/validation.ts +++ b/src/common/validation.ts @@ -1,5 +1,5 @@ import { createHash, randomUUID } from "node:crypto"; -import type { BackendProfile, CreateCommandInput, CreateQueueTaskInput, CreateRunInput, ExecutionPolicy, JsonRecord, JsonValue, QueueTaskState, ResourceBundleRef, SecretRef, SessionRef } from "./types.js"; +import type { BackendProfile, CreateCommandInput, CreateQueueTaskInput, CreateRunInput, ExecutionPolicy, JsonRecord, JsonValue, QueueTaskState, ResourceBundleRef, SecretRef, SessionListState, SessionRef } from "./types.js"; import { AgentRunError } from "./errors.js"; import { backendProfileSpec, backendProfiles, isBackendProfile } from "./backend-profiles.js"; @@ -328,3 +328,13 @@ export function validateQueueTaskState(value: string): QueueTaskState { if (value === "pending" || value === "running" || value === "completed" || value === "failed" || value === "blocked" || value === "cancelled") return value; throw new AgentRunError("schema-invalid", `queue task state ${value} is not supported`, { httpStatus: 400 }); } + +export function validateSessionListState(value: string): SessionListState { + if (value === "default" || value === "running" || value === "unread" || value === "terminal" || value === "idle" || value === "all") return value; + throw new AgentRunError("schema-invalid", `session state ${value} is not supported`, { httpStatus: 400 }); +} + +export function validateBackendProfile(value: string): BackendProfile { + if (isBackendProfile(value)) return value; + throw new AgentRunError("schema-invalid", `backendProfile ${value} is not supported in v0.1`, { httpStatus: 400, details: { allowedBackends: [...backendProfiles] } }); +} diff --git a/src/mgr/postgres-store.ts b/src/mgr/postgres-store.ts index e4ac6b6..89c3975 100644 --- a/src/mgr/postgres-store.ts +++ b/src/mgr/postgres-store.ts @@ -3,10 +3,10 @@ import { Pool } from "pg"; import type { PoolClient, QueryResultRow } from "pg"; import { AgentRunError } from "../common/errors.js"; import { redactJson } from "../common/redaction.js"; -import type { BackendProfile, BackendTurnResult, CommandRecord, CommandState, CreateCommandInput, CreateQueueTaskInput, CreateRunInput, EventType, FailureKind, JsonRecord, JsonValue, QueueCommanderSnapshot, QueueReadCursorRecord, QueueStats, QueueTaskListResult, QueueTaskRecord, QueueTaskState, RunEvent, RunnerJobRecord, RunnerRecord, RunRecord, RunStatus, SessionRecord, SessionRef, TerminalStatus } from "../common/types.js"; +import type { BackendProfile, BackendTurnResult, CommandRecord, CommandState, CreateCommandInput, CreateQueueTaskInput, CreateRunInput, EventType, FailureKind, JsonRecord, JsonValue, QueueCommanderSnapshot, QueueReadCursorRecord, QueueStats, QueueTaskListResult, QueueTaskRecord, QueueTaskState, RunEvent, RunnerJobRecord, RunnerRecord, RunRecord, RunStatus, SessionEventPage, SessionListResult, SessionReadCursorRecord, SessionRecord, SessionRef, SessionSummary, TerminalStatus } from "../common/types.js"; import { newId, nowIso, stableHash } from "../common/validation.js"; -import type { AgentRunStore, ListQueueTasksInput, SaveRunnerJobInput, StoreHealth, UpdateQueueTaskAttemptInput } from "./store.js"; -import { assertSessionBoundary, buildQueueStats, clampQueueLimit, commandStateFromTerminal, isLeaseExpired, isTerminalCommandState, isTerminalQueueTaskState, isTerminalRunStatus, parseQueueCursor, queueTaskSort, sessionRefFromRecord, statusFromTerminal, summarizeResourceBundleRef, summarizeSessionRef } from "./store.js"; +import type { AgentRunStore, ListQueueTasksInput, ListSessionsInput, SaveRunnerJobInput, SessionEventPageInput, StoreHealth, UpdateQueueTaskAttemptInput } from "./store.js"; +import { assertSessionBoundary, buildQueueStats, buildSessionSummary, clampQueueLimit, clampSessionLimit, commandStateFromTerminal, isLeaseExpired, isSessionOutputEvent, isTerminalCommandState, isTerminalQueueTaskState, isTerminalRunStatus, parseQueueCursor, parseSessionCursor, queueTaskSort, sessionListFilters, sessionMatchesListState, sessionRefFromRecord, sessionSort, sessionTitleFromCommand, statusFromTerminal, summarizeResourceBundleRef, summarizeSessionRef, titleFromMetadata } from "./store.js"; import { backendCapabilitiesSqlValues, mergeBackendCapability } from "../common/backend-profiles.js"; import { normalizeRunEventPayload, requireEventType } from "../common/events.js"; @@ -137,6 +137,44 @@ ON CONFLICT (profile) DO UPDATE SET updated_at = EXCLUDED.updated_at; `; +const sessionControlMigrationSql = ` +ALTER TABLE agentrun_sessions ADD COLUMN IF NOT EXISTS version bigint NOT NULL DEFAULT 1; +ALTER TABLE agentrun_sessions ADD COLUMN IF NOT EXISTS execution_state text NOT NULL DEFAULT 'idle'; +ALTER TABLE agentrun_sessions ADD COLUMN IF NOT EXISTS last_run_id text; +ALTER TABLE agentrun_sessions ADD COLUMN IF NOT EXISTS last_command_id text; +ALTER TABLE agentrun_sessions ADD COLUMN IF NOT EXISTS active_run_id text; +ALTER TABLE agentrun_sessions ADD COLUMN IF NOT EXISTS active_command_id text; +ALTER TABLE agentrun_sessions ADD COLUMN IF NOT EXISTS last_event_seq integer NOT NULL DEFAULT 0; +ALTER TABLE agentrun_sessions ADD COLUMN IF NOT EXISTS terminal_status text; +ALTER TABLE agentrun_sessions ADD COLUMN IF NOT EXISTS failure_kind text; +ALTER TABLE agentrun_sessions ADD COLUMN IF NOT EXISTS title text; +ALTER TABLE agentrun_sessions ADD COLUMN IF NOT EXISTS summary jsonb NOT NULL DEFAULT '{}'::jsonb; +ALTER TABLE agentrun_sessions ADD COLUMN IF NOT EXISTS last_activity_at timestamptz; + +CREATE SEQUENCE IF NOT EXISTS agentrun_session_version_seq; + +SELECT setval( + 'agentrun_session_version_seq', + GREATEST( + COALESCE((SELECT MAX(version) FROM agentrun_sessions), 0), + 1 + ), + true +); + +CREATE TABLE IF NOT EXISTS agentrun_session_read_cursors ( + session_id text NOT NULL REFERENCES agentrun_sessions(session_id) ON DELETE CASCADE, + reader_id text NOT NULL, + session_version bigint NOT NULL, + read_at timestamptz NOT NULL, + PRIMARY KEY (session_id, reader_id) +); + +CREATE INDEX IF NOT EXISTS agentrun_sessions_control_idx ON agentrun_sessions (execution_state, backend_profile, updated_at DESC); +CREATE INDEX IF NOT EXISTS agentrun_sessions_version_idx ON agentrun_sessions (version, updated_at DESC); +CREATE INDEX IF NOT EXISTS agentrun_runs_session_idx ON agentrun_runs ((session_ref->>'sessionId'), updated_at DESC); +`; + const hwlabManualDispatchMigrationSql = ` ALTER TABLE agentrun_runs ADD COLUMN IF NOT EXISTS session_ref jsonb; ALTER TABLE agentrun_runs ADD COLUMN IF NOT EXISTS resource_bundle_ref jsonb; @@ -254,13 +292,18 @@ const postgresMigrations: MigrationDefinition[] = [ checksum: checksumSql(minimaxM3BackendProfileMigrationSql), sql: minimaxM3BackendProfileMigrationSql, }, + { + id: "006_v01_session_control", + checksum: checksumSql(sessionControlMigrationSql), + sql: sessionControlMigrationSql, + }, ]; export function postgresMigrationContract(): JsonRecord { return { migrationIds: postgresMigrations.map((migration) => migration.id), latestMigrationId: latestMigrationId(), - requiredTables: ["agentrun_schema_migrations", "agentrun_runs", "agentrun_commands", "agentrun_events", "agentrun_runners", "agentrun_backends", "agentrun_leases", "agentrun_sessions", "agentrun_runner_jobs", "agentrun_queue_tasks", "agentrun_queue_read_cursors"], + requiredTables: ["agentrun_schema_migrations", "agentrun_runs", "agentrun_commands", "agentrun_events", "agentrun_runners", "agentrun_backends", "agentrun_leases", "agentrun_sessions", "agentrun_session_read_cursors", "agentrun_runner_jobs", "agentrun_queue_tasks", "agentrun_queue_read_cursors"], checksums: Object.fromEntries(postgresMigrations.map((migration) => [migration.id, migration.checksum])), }; } @@ -326,6 +369,7 @@ CREATE TABLE IF NOT EXISTS agentrun_schema_migrations ( VALUES ($1, $2, $3, $4::jsonb, $5::jsonb, $6::jsonb, $7, $8, $9::jsonb, $10::jsonb, $11, $12, $13, $14, $15, $16, $17, $18)`, [run.id, run.tenantId, run.projectId, JSON.stringify(run.workspaceRef), JSON.stringify(run.sessionRef), JSON.stringify(run.resourceBundleRef), run.providerId, run.backendProfile, JSON.stringify(run.executionPolicy), JSON.stringify(run.traceSink), run.status, run.terminalStatus, run.failureKind, run.failureMessage, run.createdAt, run.updatedAt, run.claimedBy, run.leaseExpiresAt], ); + await this.touchSessionForRun(client, run, { lastRunId: run.id, lastActivityAt: at }, { bumpVersion: false, at }); await this.appendEventWithLockedRun(client, run.id, "backend_status", { phase: "run-created", backendProfile: run.backendProfile, sessionRef: summarizeSessionRef(run.sessionRef ?? null), resourceBundleRef: summarizeResourceBundleRef(run.resourceBundleRef ?? null) }); return run; }); @@ -365,6 +409,8 @@ CREATE TABLE IF NOT EXISTS agentrun_schema_migrations ( VALUES ($1, $2, $3, $4, $5::jsonb, $6, $7, $8, $9, $10, $11)`, [command.id, command.runId, command.seq, command.type, JSON.stringify(command.payload), command.payloadHash, command.idempotencyKey ?? null, command.state, command.createdAt, command.updatedAt, command.acknowledgedAt], ); + if (command.type === "turn") await this.touchSessionForRun(client, run, { executionState: "running", lastRunId: run.id, lastCommandId: command.id, activeRunId: run.id, activeCommandId: command.id, terminalStatus: null, failureKind: null, title: sessionTitleFromCommand(command), lastActivityAt: at }, { bumpVersion: true, at }); + else if (command.type === "steer") await this.touchSessionForRun(client, run, { executionState: "running", lastRunId: run.id, lastCommandId: command.id, activeRunId: run.id, lastActivityAt: at }, { bumpVersion: true, at }); await this.appendEventWithLockedRun(client, runId, "backend_status", { phase: "command-created", commandId: command.id, commandType: command.type }); return command; }); @@ -464,8 +510,10 @@ CREATE TABLE IF NOT EXISTS agentrun_schema_migrations ( ON CONFLICT (run_id) DO UPDATE SET runner_id = EXCLUDED.runner_id, lease_expires_at = EXCLUDED.lease_expires_at, updated_at = EXCLUDED.updated_at`, [runId, runnerId, leaseExpiresAt, null, nowIso()], ); + const next = runFromRow(updated.rows[0]); + await this.touchSessionForRun(client, next, { executionState: "running", activeRunId: runId, lastRunId: runId, lastActivityAt: next.updatedAt }, { bumpVersion: false, at: next.updatedAt }); await this.appendEventWithLockedRun(client, runId, "backend_status", { phase: "run-claimed", runnerId }); - return runFromRow(updated.rows[0]); + return next; }); } @@ -505,6 +553,7 @@ CREATE TABLE IF NOT EXISTS agentrun_schema_migrations ( const updated = await client.query("UPDATE agentrun_commands SET state = $2, updated_at = $3 WHERE id = $1 RETURNING *", [commandId, state, nowIso()]); const run = await this.requireRunForUpdate(client, command.runId); if (result.threadId && run.sessionRef?.sessionId) await this.upsertSessionThread(client, run, result.threadId, result.turnId ?? null); + if (command.type === "turn") await this.touchSessionForRun(client, run, { executionState: "terminal", activeRunId: null, activeCommandId: null, lastRunId: command.runId, lastCommandId: command.id, terminalStatus: result.terminalStatus, failureKind: result.failureKind, lastActivityAt: nowIso() }, { bumpVersion: true }); await this.appendEventWithLockedRun(client, command.runId, "backend_status", { phase: "command-terminal", commandId, state, terminalStatus: result.terminalStatus, failureKind: result.failureKind, message: result.failureMessage ?? null, threadId: result.threadId ?? null, turnId: result.turnId ?? null }); return commandFromRow(updated.rows[0]); }); @@ -529,6 +578,7 @@ CREATE TABLE IF NOT EXISTS agentrun_schema_migrations ( const run = runFromRow(updated.rows[0]); if (result.threadId && run.sessionRef?.sessionId) await this.upsertSessionThread(client, run, result.threadId, result.turnId ?? null); await this.appendEventWithLockedRun(client, runId, "terminal_status", { terminalStatus: result.terminalStatus, failureKind: result.failureKind, message: result.failureMessage }); + await this.touchSessionForRun(client, run, { executionState: "terminal", activeRunId: null, activeCommandId: null, lastRunId: runId, terminalStatus: result.terminalStatus, failureKind: result.failureKind, lastActivityAt: run.updatedAt }, { bumpVersion: true, at: run.updatedAt }); return run; }); } @@ -550,7 +600,9 @@ CREATE TABLE IF NOT EXISTS agentrun_schema_migrations ( [runId, reason, at], ); await this.appendEventWithLockedRun(client, runId, "terminal_status", { terminalStatus: "cancelled", failureKind: "cancelled", message: reason }); - return runFromRow(updated.rows[0]); + const next = runFromRow(updated.rows[0]); + await this.touchSessionForRun(client, next, { executionState: "terminal", activeRunId: null, activeCommandId: null, lastRunId: runId, terminalStatus: "cancelled", failureKind: "cancelled", lastActivityAt: at }, { bumpVersion: true, at }); + return next; }); } @@ -563,6 +615,10 @@ CREATE TABLE IF NOT EXISTS agentrun_schema_migrations ( if (isTerminalCommandState(command.state)) return command; const updated = await client.query("UPDATE agentrun_commands SET state = 'cancelled', updated_at = $2 WHERE id = $1 RETURNING *", [commandId, nowIso()]); await this.appendEventWithLockedRun(client, command.runId, "backend_status", { phase: "command-terminal", commandId, state: "cancelled", terminalStatus: "cancelled", failureKind: "cancelled", message: reason }); + if (command.type === "turn") { + const run = await this.requireRunForUpdate(client, command.runId); + await this.touchSessionForRun(client, run, { executionState: "terminal", activeRunId: null, activeCommandId: null, lastRunId: command.runId, lastCommandId: command.id, terminalStatus: "cancelled", failureKind: "cancelled", lastActivityAt: nowIso() }, { bumpVersion: true }); + } return commandFromRow(updated.rows[0]); }); } @@ -572,6 +628,63 @@ CREATE TABLE IF NOT EXISTS agentrun_schema_migrations ( return result.rows[0] ? sessionFromRow(result.rows[0]) : null; } + async getSessionSummary(sessionId: string, readerId: string | null = null): Promise { + const session = await this.getSession(sessionId); + if (!session) throw new AgentRunError("schema-invalid", `session ${sessionId} was not found`, { httpStatus: 404 }); + const cursor = readerId ? await this.getSessionReadCursor(sessionId, readerId) : null; + return buildSessionSummary(session, readerId, cursor); + } + + async listSessions(input: ListSessionsInput): Promise { + const startVersion = parseSessionCursor(input.cursor) ?? 0; + const state = input.state ?? "default"; + const params: unknown[] = [startVersion]; + const where = ["version > $1"]; + if (input.backendProfile) { + params.push(input.backendProfile); + where.push(`backend_profile = $${params.length}`); + } + const result = await this.pool.query(`SELECT * FROM agentrun_sessions WHERE ${where.join(" AND ")} ORDER BY updated_at DESC, session_id ASC LIMIT 500`, params); + const cursors = input.readerId ? await this.loadSessionReadCursors(input.readerId, result.rows.map((row) => stringValue(row.session_id))) : new Map(); + const items = result.rows + .map(sessionFromRow) + .map((session) => buildSessionSummary(session, input.readerId ?? null, input.readerId ? cursors.get(session.sessionId) ?? null : null)) + .filter((session) => sessionMatchesListState(session, state)) + .sort(sessionSort) + .slice(0, clampSessionLimit(input.limit)); + return { items, count: items.length, cursor: items.length > 0 ? String(items[items.length - 1]?.version ?? startVersion) : null, filters: sessionListFilters(input) }; + } + + async listSessionTrace(sessionId: string, input: SessionEventPageInput): Promise { + const runId = await this.resolveSessionRunId(sessionId, input.runId ?? null); + if (!runId) return { sessionId, runId: null, items: [], count: 0, cursor: null }; + const items = await this.listEvents(runId, input.afterSeq, input.limit); + return { sessionId, runId, items, count: items.length, cursor: items.length > 0 ? String(items[items.length - 1]?.seq ?? input.afterSeq) : null }; + } + + async listSessionOutput(sessionId: string, input: SessionEventPageInput): Promise { + const page = await this.listSessionTrace(sessionId, input); + const items = page.items.filter(isSessionOutputEvent); + return { ...page, items, count: items.length, cursor: items.length > 0 ? String(items[items.length - 1]?.seq ?? input.afterSeq) : null }; + } + + async markSessionRead(sessionId: string, readerId: string): Promise { + return this.withTransaction(async (client) => { + const result = await client.query("SELECT * FROM agentrun_sessions WHERE session_id = $1 FOR UPDATE", [sessionId]); + const row = result.rows[0]; + if (!row) throw new AgentRunError("schema-invalid", `session ${sessionId} was not found`, { httpStatus: 404 }); + const session = sessionFromRow(row); + const record: SessionReadCursorRecord = { sessionId, readerId, sessionVersion: session.version, readAt: nowIso() }; + await client.query( + `INSERT INTO agentrun_session_read_cursors (session_id, reader_id, session_version, read_at) + VALUES ($1, $2, $3, $4) + ON CONFLICT (session_id, reader_id) DO UPDATE SET session_version = EXCLUDED.session_version, read_at = EXCLUDED.read_at`, + [record.sessionId, record.readerId, record.sessionVersion, record.readAt], + ); + return record; + }); + } + async createQueueTask(input: CreateQueueTaskInput): Promise { const payloadHash = stableHash(input.payload); return this.withTransaction(async (client) => { @@ -697,6 +810,12 @@ CREATE TABLE IF NOT EXISTS agentrun_schema_migrations ( const seq = await this.nextSeq(client, "agentrun_events", runId); const event: RunEvent = { id: newId("evt"), runId, seq, type: eventType, payload: redactJson(eventPayload), createdAt: nowIso() }; await client.query("INSERT INTO agentrun_events (id, run_id, seq, type, payload, created_at) VALUES ($1, $2, $3, $4, $5::jsonb, $6)", [event.id, event.runId, event.seq, event.type, JSON.stringify(event.payload), event.createdAt]); + await client.query( + `UPDATE agentrun_sessions + SET last_event_seq = $2, last_activity_at = $3, updated_at = $3 + WHERE session_id = (SELECT session_ref->>'sessionId' FROM agentrun_runs WHERE id = $1)`, + [runId, event.seq, event.createdAt], + ); return event; } @@ -710,6 +829,25 @@ CREATE TABLE IF NOT EXISTS agentrun_schema_migrations ( return Number(result.rows[0]?.version ?? 1); } + private async nextSessionVersion(client: PoolClient): Promise { + const result = await client.query<{ version: string | number }>("SELECT nextval('agentrun_session_version_seq') AS version"); + return Number(result.rows[0]?.version ?? 1); + } + + private async getSessionReadCursor(sessionId: string, readerId: string): Promise { + const result = await this.pool.query("SELECT * FROM agentrun_session_read_cursors WHERE session_id = $1 AND reader_id = $2", [sessionId, readerId]); + return result.rows[0] ? sessionReadCursorFromRow(result.rows[0]) : null; + } + + private async loadSessionReadCursors(readerId: string, sessionIds: string[]): Promise> { + if (sessionIds.length === 0) return new Map(); + const result = await this.pool.query("SELECT * FROM agentrun_session_read_cursors WHERE reader_id = $1 AND session_id = ANY($2::text[])", [readerId, sessionIds]); + return new Map(result.rows.map((row) => { + const cursor = sessionReadCursorFromRow(row); + return [cursor.sessionId, cursor]; + })); + } + private async loadQueueTasksForProjection(queue?: string): Promise { if (queue) { const result = await this.pool.query("SELECT * FROM agentrun_queue_tasks WHERE queue = $1", [queue]); @@ -742,14 +880,26 @@ CREATE TABLE IF NOT EXISTS agentrun_schema_migrations ( conversationId: input.sessionRef.conversationId ?? null, threadId: input.sessionRef.threadId ?? null, metadata: input.sessionRef.metadata ?? {}, + version: await this.nextSessionVersion(client), + executionState: "idle", + lastRunId: null, + lastCommandId: null, + activeRunId: null, + activeCommandId: null, + lastEventSeq: 0, + terminalStatus: null, + failureKind: null, + title: titleFromMetadata(input.sessionRef.metadata ?? {}), + summary: {}, + lastActivityAt: at, createdAt: at, updatedAt: at, expiresAt: input.sessionRef.expiresAt ?? null, }; await client.query( - `INSERT INTO agentrun_sessions (session_id, tenant_id, project_id, backend_profile, conversation_id, thread_id, metadata, created_at, updated_at, expires_at) - VALUES ($1, $2, $3, $4, $5, $6, $7::jsonb, $8, $9, $10)`, - [record.sessionId, record.tenantId, record.projectId, record.backendProfile, record.conversationId, record.threadId, JSON.stringify(record.metadata), record.createdAt, record.updatedAt, record.expiresAt], + `INSERT INTO agentrun_sessions (session_id, tenant_id, project_id, backend_profile, conversation_id, thread_id, metadata, version, execution_state, last_run_id, last_command_id, active_run_id, active_command_id, last_event_seq, terminal_status, failure_kind, title, summary, last_activity_at, created_at, updated_at, expires_at) + VALUES ($1, $2, $3, $4, $5, $6, $7::jsonb, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18::jsonb, $19, $20, $21, $22)`, + [record.sessionId, record.tenantId, record.projectId, record.backendProfile, record.conversationId, record.threadId, JSON.stringify(record.metadata), record.version, record.executionState, record.lastRunId, record.lastCommandId, record.activeRunId, record.activeCommandId, record.lastEventSeq, record.terminalStatus, record.failureKind, record.title, JSON.stringify(record.summary), record.lastActivityAt, record.createdAt, record.updatedAt, record.expiresAt], ); return sessionRefFromRecord(record, input.sessionRef); } @@ -768,13 +918,25 @@ CREATE TABLE IF NOT EXISTS agentrun_schema_migrations ( conversationId: run.sessionRef.conversationId ?? existing?.conversationId ?? null, threadId, metadata, + version: await this.nextSessionVersion(client), + executionState: existing?.executionState ?? "idle", + lastRunId: existing?.lastRunId ?? run.id, + lastCommandId: existing?.lastCommandId ?? null, + activeRunId: existing?.activeRunId ?? null, + activeCommandId: existing?.activeCommandId ?? null, + lastEventSeq: existing?.lastEventSeq ?? 0, + terminalStatus: existing?.terminalStatus ?? null, + failureKind: existing?.failureKind ?? null, + title: existing?.title ?? titleFromMetadata(run.sessionRef.metadata ?? {}), + summary: existing?.summary ?? {}, + lastActivityAt: at, createdAt: existing?.createdAt ?? at, updatedAt: at, expiresAt: run.sessionRef.expiresAt ?? existing?.expiresAt ?? null, }; await client.query( - `INSERT INTO agentrun_sessions (session_id, tenant_id, project_id, backend_profile, conversation_id, thread_id, metadata, created_at, updated_at, expires_at) - VALUES ($1, $2, $3, $4, $5, $6, $7::jsonb, $8, $9, $10) + `INSERT INTO agentrun_sessions (session_id, tenant_id, project_id, backend_profile, conversation_id, thread_id, metadata, version, execution_state, last_run_id, last_command_id, active_run_id, active_command_id, last_event_seq, terminal_status, failure_kind, title, summary, last_activity_at, created_at, updated_at, expires_at) + VALUES ($1, $2, $3, $4, $5, $6, $7::jsonb, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18::jsonb, $19, $20, $21, $22) ON CONFLICT (session_id) DO UPDATE SET tenant_id = EXCLUDED.tenant_id, project_id = EXCLUDED.project_id, @@ -782,15 +944,81 @@ CREATE TABLE IF NOT EXISTS agentrun_schema_migrations ( conversation_id = EXCLUDED.conversation_id, thread_id = EXCLUDED.thread_id, metadata = EXCLUDED.metadata, + version = EXCLUDED.version, + execution_state = EXCLUDED.execution_state, + last_run_id = EXCLUDED.last_run_id, + last_command_id = EXCLUDED.last_command_id, + active_run_id = EXCLUDED.active_run_id, + active_command_id = EXCLUDED.active_command_id, + last_event_seq = EXCLUDED.last_event_seq, + terminal_status = EXCLUDED.terminal_status, + failure_kind = EXCLUDED.failure_kind, + title = EXCLUDED.title, + summary = EXCLUDED.summary, + last_activity_at = EXCLUDED.last_activity_at, updated_at = EXCLUDED.updated_at, expires_at = EXCLUDED.expires_at`, - [record.sessionId, record.tenantId, record.projectId, record.backendProfile, record.conversationId, record.threadId, JSON.stringify(record.metadata), record.createdAt, record.updatedAt, record.expiresAt], + [record.sessionId, record.tenantId, record.projectId, record.backendProfile, record.conversationId, record.threadId, JSON.stringify(record.metadata), record.version, record.executionState, record.lastRunId, record.lastCommandId, record.activeRunId, record.activeCommandId, record.lastEventSeq, record.terminalStatus, record.failureKind, record.title, JSON.stringify(record.summary), record.lastActivityAt, record.createdAt, record.updatedAt, record.expiresAt], ); const nextSessionRef = sessionRefFromRecord(record, run.sessionRef); await client.query("UPDATE agentrun_runs SET session_ref = $2::jsonb, updated_at = $3 WHERE id = $1", [run.id, JSON.stringify(nextSessionRef), at]); await this.appendEventWithLockedRun(client, run.id, "backend_status", { phase: "session-updated", sessionRef: summarizeSessionRef(nextSessionRef), turnId }); } + private async touchSessionForRun(client: PoolClient, run: RunRecord, patch: Partial, options: { bumpVersion: boolean; at?: string }): Promise { + const sessionId = run.sessionRef?.sessionId; + if (!sessionId) return; + const existingResult = await client.query("SELECT * FROM agentrun_sessions WHERE session_id = $1 FOR UPDATE", [sessionId]); + const existing = existingResult.rows[0] ? sessionFromRow(existingResult.rows[0]) : null; + if (!existing) return; + const at = options.at ?? nowIso(); + const version = options.bumpVersion ? await this.nextSessionVersion(client) : existing.version; + await client.query( + `UPDATE agentrun_sessions SET + version = $2, + execution_state = $3, + last_run_id = $4, + last_command_id = $5, + active_run_id = $6, + active_command_id = $7, + last_event_seq = $8, + terminal_status = $9, + failure_kind = $10, + title = $11, + summary = $12::jsonb, + last_activity_at = $13, + updated_at = $14 + WHERE session_id = $1`, + [ + sessionId, + version, + patch.executionState ?? existing.executionState, + patch.lastRunId === undefined ? existing.lastRunId : patch.lastRunId, + patch.lastCommandId === undefined ? existing.lastCommandId : patch.lastCommandId, + patch.activeRunId === undefined ? existing.activeRunId : patch.activeRunId, + patch.activeCommandId === undefined ? existing.activeCommandId : patch.activeCommandId, + patch.lastEventSeq ?? existing.lastEventSeq, + patch.terminalStatus === undefined ? existing.terminalStatus : patch.terminalStatus, + patch.failureKind === undefined ? existing.failureKind : patch.failureKind, + patch.title === undefined ? existing.title : patch.title, + JSON.stringify(patch.summary ?? existing.summary), + patch.lastActivityAt ?? at, + at, + ], + ); + } + + private async resolveSessionRunId(sessionId: string, requestedRunId: string | null): Promise { + const session = await this.getSession(sessionId); + if (!session) throw new AgentRunError("schema-invalid", `session ${sessionId} was not found`, { httpStatus: 404 }); + if (requestedRunId) { + const run = await this.getRun(requestedRunId); + if (run.sessionRef?.sessionId !== sessionId) throw new AgentRunError("schema-invalid", `run ${requestedRunId} does not belong to session ${sessionId}`, { httpStatus: 404 }); + return requestedRunId; + } + return session.activeRunId ?? session.lastRunId; + } + private async withTransaction(fn: (client: PoolClient) => Promise): Promise { const client = await this.pool.connect(); try { @@ -885,12 +1113,33 @@ function sessionFromRow(row: QueryResultRow): SessionRecord { conversationId: nullableString(row.conversation_id), threadId: nullableString(row.thread_id), metadata: jsonRecord(row.metadata), + version: Number(row.version ?? 1), + executionState: sessionExecutionState(row.execution_state), + lastRunId: nullableString(row.last_run_id), + lastCommandId: nullableString(row.last_command_id), + activeRunId: nullableString(row.active_run_id), + activeCommandId: nullableString(row.active_command_id), + lastEventSeq: Number(row.last_event_seq ?? 0), + terminalStatus: nullableString(row.terminal_status) as TerminalStatus | null, + failureKind: nullableString(row.failure_kind) as FailureKind | null, + title: nullableString(row.title), + summary: jsonRecord(row.summary), + lastActivityAt: nullableIso(row.last_activity_at), createdAt: iso(row.created_at), updatedAt: iso(row.updated_at), expiresAt: nullableIso(row.expires_at), }; } +function sessionReadCursorFromRow(row: QueryResultRow): SessionReadCursorRecord { + return { sessionId: stringValue(row.session_id), readerId: stringValue(row.reader_id), sessionVersion: Number(row.session_version), readAt: iso(row.read_at) }; +} + +function sessionExecutionState(value: unknown): SessionRecord["executionState"] { + if (value === "running" || value === "terminal") return value; + return "idle"; +} + function runnerJobFromRow(row: QueryResultRow): RunnerJobRecord { return { id: stringValue(row.id), diff --git a/src/mgr/server.ts b/src/mgr/server.ts index dcdb4c8..8585038 100644 --- a/src/mgr/server.ts +++ b/src/mgr/server.ts @@ -1,10 +1,10 @@ import type { Server } from "node:http"; import { createServer } from "node:http"; import type { AddressInfo } from "node:net"; -import type { AgentRunStore, ListQueueTasksInput } from "./store.js"; +import type { AgentRunStore, ListQueueTasksInput, ListSessionsInput, SessionEventPageInput } from "./store.js"; import { openAgentRunStoreFromEnv } from "./store.js"; import { AgentRunError, errorToJson } from "../common/errors.js"; -import { asRecord, validateCreateCommand, validateCreateQueueTask, validateCreateRun, validateQueueTaskState } from "../common/validation.js"; +import { asRecord, validateBackendProfile, validateCreateCommand, validateCreateQueueTask, validateCreateRun, validateQueueTaskState, validateSessionListState } from "../common/validation.js"; import type { ApiErrorBody, ApiOkBody, JsonRecord, JsonValue, RunEvent } from "../common/types.js"; import { createKubernetesRunnerJob } from "./kubernetes-runner-job.js"; import { dispatchQueueTask, refreshQueueTaskFromCore } from "./queue-dispatch.js"; @@ -69,6 +69,54 @@ async function route({ method, url, body, store, sourceCommit, runnerJobDefaults return { serviceId: "agentrun-mgr", live: true, ready, database, sourceCommit, secretRefs: { databaseUrl: database.adapter === "postgres" ? "redacted" : "not-used", valuesPrinted: false } }; } if (method === "GET" && path === "/api/v1/backends") return { items: await store.backends() as unknown as JsonValue }; + if (method === "GET" && path === "/api/v1/sessions") { + const input: ListSessionsInput = { limit: integerQuery(url, "limit", 50) }; + const state = url.searchParams.get("state"); + const backendProfile = url.searchParams.get("backendProfile") ?? url.searchParams.get("profile"); + const readerId = url.searchParams.get("readerId"); + const cursor = url.searchParams.get("cursor"); + if (state) input.state = validateSessionListState(state); + if (backendProfile) input.backendProfile = validateBackendProfile(backendProfile); + if (readerId) input.readerId = readerId; + if (cursor) input.cursor = cursor; + return await store.listSessions(input) as unknown as JsonValue; + } + const sessionMatch = path.match(/^\/api\/v1\/sessions\/([^/]+)$/u); + if (method === "GET" && sessionMatch) return await store.getSessionSummary(sessionMatch[1] ?? "", url.searchParams.get("readerId")) as unknown as JsonValue; + const sessionTraceMatch = path.match(/^\/api\/v1\/sessions\/([^/]+)\/trace$/u); + if (method === "GET" && sessionTraceMatch) { + const input: SessionEventPageInput = { afterSeq: integerQuery(url, "afterSeq", 0), limit: integerQuery(url, "limit", 100) }; + const runId = url.searchParams.get("runId"); + if (runId) input.runId = runId; + return await store.listSessionTrace(sessionTraceMatch[1] ?? "", input) as unknown as JsonValue; + } + const sessionOutputMatch = path.match(/^\/api\/v1\/sessions\/([^/]+)\/output$/u); + if (method === "GET" && sessionOutputMatch) { + const input: SessionEventPageInput = { afterSeq: integerQuery(url, "afterSeq", 0), limit: integerQuery(url, "limit", 100) }; + const runId = url.searchParams.get("runId"); + if (runId) input.runId = runId; + return await store.listSessionOutput(sessionOutputMatch[1] ?? "", input) as unknown as JsonValue; + } + const sessionReadMatch = path.match(/^\/api\/v1\/sessions\/([^/]+)\/read$/u); + if (method === "POST" && sessionReadMatch) { + const record = body === null ? {} : asRecord(body, "read"); + const readerId = typeof record.readerId === "string" && record.readerId.trim().length > 0 ? record.readerId.trim() : "cli"; + return await store.markSessionRead(sessionReadMatch[1] ?? "", readerId) as unknown as JsonValue; + } + const sessionControlMatch = path.match(/^\/api\/v1\/sessions\/([^/]+)\/control$/u); + if (method === "POST" && sessionControlMatch) { + const record = asRecord(body ?? {}, "sessionControl"); + const action = typeof record.action === "string" ? record.action : ""; + const session = await store.getSessionSummary(sessionControlMatch[1] ?? "", typeof record.readerId === "string" ? record.readerId : null); + if (action === "read") return await store.markSessionRead(session.sessionId, typeof record.readerId === "string" && record.readerId.trim().length > 0 ? record.readerId.trim() : "cli") as unknown as JsonValue; + if (action === "cancel") { + const reason = typeof record.reason === "string" && record.reason.trim().length > 0 ? record.reason.trim() : undefined; + if (session.activeCommandId) return await store.cancelCommand(session.activeCommandId, reason) as unknown as JsonValue; + if (session.activeRunId) return await store.cancelRun(session.activeRunId, reason) as unknown as JsonValue; + throw new AgentRunError("schema-invalid", `session ${session.sessionId} has no active run or command`, { httpStatus: 409 }); + } + throw new AgentRunError("schema-invalid", `session control action ${action} is not supported`, { httpStatus: 400 }); + } if (method === "POST" && path === "/api/v1/queue/tasks") return await store.createQueueTask(validateCreateQueueTask(body)) as unknown as JsonValue; if (method === "GET" && path === "/api/v1/queue/tasks") { const state = url.searchParams.get("state"); diff --git a/src/mgr/store.ts b/src/mgr/store.ts index 0ca2a02..8fb29df 100644 --- a/src/mgr/store.ts +++ b/src/mgr/store.ts @@ -1,4 +1,4 @@ -import type { BackendProfile, BackendTurnResult, CommandRecord, CreateCommandInput, CreateQueueTaskInput, CreateRunInput, FailureKind, JsonRecord, QueueAttemptRef, QueueCommanderSnapshot, QueueReadCursorRecord, QueueStats, QueueTaskListResult, QueueTaskRecord, QueueTaskState, RunEvent, RunnerJobRecord, RunnerRecord, RunRecord, SessionRecord, SessionRef, TerminalStatus } from "../common/types.js"; +import type { BackendProfile, BackendTurnResult, CommandRecord, CreateCommandInput, CreateQueueTaskInput, CreateRunInput, FailureKind, JsonRecord, QueueAttemptRef, QueueCommanderSnapshot, QueueReadCursorRecord, QueueStats, QueueTaskListResult, QueueTaskRecord, QueueTaskState, RunEvent, RunnerJobRecord, RunnerRecord, RunRecord, SessionEventPage, SessionListResult, SessionListState, SessionReadCursorRecord, SessionRecord, SessionRef, SessionSummary, TerminalStatus } from "../common/types.js"; import { AgentRunError } from "../common/errors.js"; import { newId, nowIso, stableHash } from "../common/validation.js"; import { redactJson } from "../common/redaction.js"; @@ -39,6 +39,11 @@ export interface AgentRunStore { cancelRun(runId: string, reason?: string): MaybePromise; cancelCommand(commandId: string, reason?: string): MaybePromise; getSession(sessionId: string): MaybePromise; + getSessionSummary(sessionId: string, readerId?: string | null): MaybePromise; + listSessions(input: ListSessionsInput): MaybePromise; + listSessionTrace(sessionId: string, input: SessionEventPageInput): MaybePromise; + listSessionOutput(sessionId: string, input: SessionEventPageInput): MaybePromise; + markSessionRead(sessionId: string, readerId: string): MaybePromise; createQueueTask(input: CreateQueueTaskInput): MaybePromise; listQueueTasks(input: ListQueueTasksInput): MaybePromise; getQueueTask(taskId: string): MaybePromise; @@ -59,6 +64,20 @@ export interface ListQueueTasksInput { updatedAfter?: number; } +export interface ListSessionsInput { + state?: SessionListState; + backendProfile?: BackendProfile; + readerId?: string | null; + cursor?: string; + limit: number; +} + +export interface SessionEventPageInput { + runId?: string | null; + afterSeq: number; + limit: number; +} + export interface UpdateQueueTaskAttemptInput { state: QueueTaskState; latestAttempt: QueueAttemptRef; @@ -98,10 +117,12 @@ export class MemoryAgentRunStore implements AgentRunStore { private readonly eventsByRun = new Map(); private readonly runners = new Map(); private readonly sessions = new Map(); + private readonly sessionReadCursors = new Map(); private readonly runnerJobs = new Map(); private readonly queueTasks = new Map(); private readonly queueReadCursors = new Map(); private queueVersion = 0; + private sessionVersion = 0; health(): StoreHealth { return { adapter: "memory-self-test", ready: true, reachable: true, migrationReady: true, migrationId: "memory-self-test", failureKind: null, message: null, credentialValuesPrinted: false }; @@ -113,6 +134,7 @@ export class MemoryAgentRunStore implements AgentRunStore { const run: RunRecord = { ...input, sessionRef, resourceBundleRef: input.resourceBundleRef ?? null, id: newId("run"), status: "pending", terminalStatus: null, failureKind: null, failureMessage: null, createdAt: at, updatedAt: at, claimedBy: null, leaseExpiresAt: null }; this.runs.set(run.id, run); this.eventsByRun.set(run.id, []); + this.touchSessionForRun(run, { lastRunId: run.id, lastActivityAt: at }, { bumpVersion: false, at }); this.appendEvent(run.id, "backend_status", { phase: "run-created", backendProfile: run.backendProfile, sessionRef: summarizeSessionRef(run.sessionRef ?? null), resourceBundleRef: summarizeResourceBundleRef(run.resourceBundleRef ?? null) }); return run; } @@ -143,6 +165,8 @@ export class MemoryAgentRunStore implements AgentRunStore { const seq = Array.from(this.commands.values()).filter((command) => command.runId === runId).length + 1; const command: CommandRecord = { ...input, id: newId("cmd"), runId, seq, state: "pending", payloadHash, createdAt: at, updatedAt: at, acknowledgedAt: null }; this.commands.set(command.id, command); + if (command.type === "turn") this.touchSessionForRun(run, { executionState: "running", lastRunId: run.id, lastCommandId: command.id, activeRunId: run.id, activeCommandId: command.id, terminalStatus: null, failureKind: null, title: sessionTitleFromCommand(command), lastActivityAt: at }, { bumpVersion: true, at }); + else if (command.type === "steer") this.touchSessionForRun(run, { executionState: "running", lastRunId: run.id, lastCommandId: command.id, activeRunId: run.id, lastActivityAt: at }, { bumpVersion: true, at }); this.appendEvent(runId, "backend_status", { phase: "command-created", commandId: command.id, commandType: command.type }); return command; } @@ -195,6 +219,7 @@ export class MemoryAgentRunStore implements AgentRunStore { if (isTerminalRunStatus(run.status)) throw new AgentRunError(run.failureKind ?? (run.status === "cancelled" ? "cancelled" : "schema-invalid"), `run ${runId} is already terminal: ${run.status}`, { httpStatus: 409 }); if (run.claimedBy && run.claimedBy !== runnerId && !isLeaseExpired(run.leaseExpiresAt)) throw new AgentRunError("runner-lease-conflict", `run ${runId} is already claimed`, { httpStatus: 409 }); const next = this.updateRun(runId, { status: "claimed", claimedBy: runnerId, leaseExpiresAt: new Date(Date.now() + leaseMs).toISOString() }); + this.touchSessionForRun(next, { executionState: "running", activeRunId: runId, lastRunId: runId, lastActivityAt: next.updatedAt }, { bumpVersion: false, at: next.updatedAt }); this.appendEvent(runId, "backend_status", { phase: "run-claimed", runnerId }); return next; } @@ -221,6 +246,7 @@ export class MemoryAgentRunStore implements AgentRunStore { this.commands.set(commandId, next); const run = this.getRun(command.runId); if (result.threadId && run.sessionRef?.sessionId) this.upsertSessionThread(run, result.threadId, result.turnId ?? null); + if (command.type === "turn") this.touchSessionForRun(this.getRun(command.runId), { executionState: "terminal", activeRunId: null, activeCommandId: null, lastRunId: command.runId, lastCommandId: command.id, terminalStatus: result.terminalStatus, failureKind: result.failureKind, lastActivityAt: next.updatedAt }, { bumpVersion: true, at: next.updatedAt }); this.appendEvent(command.runId, "backend_status", { phase: "command-terminal", commandId, state: next.state, terminalStatus: result.terminalStatus, failureKind: result.failureKind, message: result.failureMessage ?? null, threadId: result.threadId ?? null, turnId: result.turnId ?? null }); return next; } @@ -233,6 +259,7 @@ export class MemoryAgentRunStore implements AgentRunStore { const event: RunEvent = { id: newId("evt"), runId, seq: events.length + 1, type: eventType, payload: redactJson(eventPayload), createdAt: nowIso() }; events.push(event); this.eventsByRun.set(runId, events); + this.touchSessionForRun(this.getRun(runId), { lastEventSeq: event.seq, lastActivityAt: event.createdAt }, { bumpVersion: false, at: event.createdAt }); return event; } @@ -243,6 +270,7 @@ export class MemoryAgentRunStore implements AgentRunStore { const next = this.updateRun(runId, { status, terminalStatus: result.terminalStatus, failureKind: result.failureKind, failureMessage: result.failureMessage }); if (result.threadId && next.sessionRef?.sessionId) this.upsertSessionThread(next, result.threadId, result.turnId ?? null); this.appendEvent(runId, "terminal_status", { terminalStatus: result.terminalStatus, failureKind: result.failureKind, message: result.failureMessage }); + this.touchSessionForRun(this.getRun(runId), { executionState: "terminal", activeRunId: null, activeCommandId: null, lastRunId: runId, terminalStatus: result.terminalStatus, failureKind: result.failureKind, lastActivityAt: next.updatedAt }, { bumpVersion: true, at: next.updatedAt }); return next; } @@ -258,6 +286,7 @@ export class MemoryAgentRunStore implements AgentRunStore { this.appendEvent(runId, "backend_status", { phase: "cancel-requested", reason }); const next = this.updateRun(runId, { status: "cancelled", terminalStatus: "cancelled", failureKind: "cancelled", failureMessage: reason }); this.appendEvent(runId, "terminal_status", { terminalStatus: "cancelled", failureKind: "cancelled", message: reason }); + this.touchSessionForRun(next, { executionState: "terminal", activeRunId: null, activeCommandId: null, lastRunId: runId, terminalStatus: "cancelled", failureKind: "cancelled", lastActivityAt: next.updatedAt }, { bumpVersion: true, at: next.updatedAt }); return next; } @@ -267,6 +296,7 @@ export class MemoryAgentRunStore implements AgentRunStore { const next = { ...command, state: "cancelled" as const, updatedAt: nowIso() }; this.commands.set(commandId, next); this.appendEvent(command.runId, "backend_status", { phase: "command-terminal", commandId, state: "cancelled", terminalStatus: "cancelled", failureKind: "cancelled", message: reason }); + if (command.type === "turn") this.touchSessionForRun(this.getRun(command.runId), { executionState: "terminal", activeRunId: null, activeCommandId: null, lastRunId: command.runId, lastCommandId: command.id, terminalStatus: "cancelled", failureKind: "cancelled", lastActivityAt: next.updatedAt }, { bumpVersion: true, at: next.updatedAt }); return next; } @@ -274,6 +304,46 @@ export class MemoryAgentRunStore implements AgentRunStore { return this.sessions.get(sessionId) ?? null; } + getSessionSummary(sessionId: string, readerId: string | null = null): SessionSummary { + const session = this.getSession(sessionId); + if (!session) throw new AgentRunError("schema-invalid", `session ${sessionId} was not found`, { httpStatus: 404 }); + return buildSessionSummary(session, readerId, readerId ? this.sessionReadCursors.get(sessionReadKey(sessionId, readerId)) ?? null : null); + } + + listSessions(input: ListSessionsInput): SessionListResult { + const startVersion = parseSessionCursor(input.cursor) ?? 0; + const state = input.state ?? "default"; + const items = Array.from(this.sessions.values()) + .map((session) => buildSessionSummary(session, input.readerId ?? null, input.readerId ? this.sessionReadCursors.get(sessionReadKey(session.sessionId, input.readerId)) ?? null : null)) + .filter((session) => session.version > startVersion) + .filter((session) => !input.backendProfile || session.backendProfile === input.backendProfile) + .filter((session) => sessionMatchesListState(session, state)) + .sort(sessionSort) + .slice(0, clampSessionLimit(input.limit)); + return { items, count: items.length, cursor: items.length > 0 ? String(items[items.length - 1]?.version ?? startVersion) : null, filters: sessionListFilters(input) }; + } + + listSessionTrace(sessionId: string, input: SessionEventPageInput): SessionEventPage { + const runId = this.resolveSessionRunId(sessionId, input.runId ?? null); + if (!runId) return { sessionId, runId: null, items: [], count: 0, cursor: null }; + const items = this.listEvents(runId, input.afterSeq, input.limit); + return { sessionId, runId, items, count: items.length, cursor: items.length > 0 ? String(items[items.length - 1]?.seq ?? input.afterSeq) : null }; + } + + listSessionOutput(sessionId: string, input: SessionEventPageInput): SessionEventPage { + const page = this.listSessionTrace(sessionId, input); + const items = page.items.filter(isSessionOutputEvent); + return { ...page, items, count: items.length, cursor: items.length > 0 ? String(items[items.length - 1]?.seq ?? input.afterSeq) : null }; + } + + markSessionRead(sessionId: string, readerId: string): SessionReadCursorRecord { + const session = this.getSession(sessionId); + if (!session) throw new AgentRunError("schema-invalid", `session ${sessionId} was not found`, { httpStatus: 404 }); + const record: SessionReadCursorRecord = { sessionId, readerId, sessionVersion: session.version, readAt: nowIso() }; + this.sessionReadCursors.set(sessionReadKey(sessionId, readerId), record); + return record; + } + createQueueTask(input: CreateQueueTaskInput): QueueTaskRecord { const payloadHash = stableHash(input.payload); if (input.idempotencyKey) { @@ -371,6 +441,18 @@ export class MemoryAgentRunStore implements AgentRunStore { conversationId: input.sessionRef.conversationId ?? null, threadId: input.sessionRef.threadId ?? null, metadata: input.sessionRef.metadata ?? {}, + version: this.nextSessionVersion(), + executionState: "idle", + lastRunId: null, + lastCommandId: null, + activeRunId: null, + activeCommandId: null, + lastEventSeq: 0, + terminalStatus: null, + failureKind: null, + title: titleFromMetadata(input.sessionRef.metadata ?? {}), + summary: {}, + lastActivityAt: at, createdAt: at, updatedAt: at, expiresAt: input.sessionRef.expiresAt ?? null, @@ -391,6 +473,18 @@ export class MemoryAgentRunStore implements AgentRunStore { conversationId: run.sessionRef.conversationId ?? existing?.conversationId ?? null, threadId, metadata: { ...(existing?.metadata ?? {}), ...(run.sessionRef.metadata ?? {}), ...(turnId ? { lastTurnId: turnId } : {}) }, + version: this.nextSessionVersion(), + executionState: existing?.executionState ?? "idle", + lastRunId: existing?.lastRunId ?? run.id, + lastCommandId: existing?.lastCommandId ?? null, + activeRunId: existing?.activeRunId ?? null, + activeCommandId: existing?.activeCommandId ?? null, + lastEventSeq: existing?.lastEventSeq ?? 0, + terminalStatus: existing?.terminalStatus ?? null, + failureKind: existing?.failureKind ?? null, + title: existing?.title ?? titleFromMetadata(run.sessionRef.metadata ?? {}), + summary: existing?.summary ?? {}, + lastActivityAt: at, createdAt: existing?.createdAt ?? at, updatedAt: at, expiresAt: run.sessionRef.expiresAt ?? existing?.expiresAt ?? null, @@ -400,6 +494,32 @@ export class MemoryAgentRunStore implements AgentRunStore { this.updateRun(run.id, { sessionRef: nextSessionRef }); this.appendEvent(run.id, "backend_status", { phase: "session-updated", sessionRef: summarizeSessionRef(nextSessionRef), turnId }); } + + private touchSessionForRun(run: RunRecord, patch: Partial, options: { bumpVersion: boolean; at?: string }): void { + const sessionId = run.sessionRef?.sessionId; + if (!sessionId) return; + const existing = this.sessions.get(sessionId); + if (!existing) return; + const at = options.at ?? nowIso(); + const next: SessionRecord = { ...existing, ...patch, version: options.bumpVersion ? this.nextSessionVersion() : existing.version, updatedAt: at, lastActivityAt: patch.lastActivityAt ?? at }; + this.sessions.set(sessionId, next); + } + + private resolveSessionRunId(sessionId: string, requestedRunId: string | null): string | null { + const session = this.getSession(sessionId); + if (!session) throw new AgentRunError("schema-invalid", `session ${sessionId} was not found`, { httpStatus: 404 }); + if (requestedRunId) { + const run = this.getRun(requestedRunId); + if (run.sessionRef?.sessionId !== sessionId) throw new AgentRunError("schema-invalid", `run ${requestedRunId} does not belong to session ${sessionId}`, { httpStatus: 404 }); + return requestedRunId; + } + return session.activeRunId ?? session.lastRunId; + } + + private nextSessionVersion(): number { + this.sessionVersion += 1; + return this.sessionVersion; + } } export function assertSessionBoundary(existing: SessionRecord, input: CreateRunInput): void { @@ -451,6 +571,43 @@ export function sessionRefFromRecord(record: SessionRecord, fallback: SessionRef }; } +export function buildSessionSummary(record: SessionRecord, readerId: string | null, readCursor: SessionReadCursorRecord | null): SessionSummary { + const active = record.executionState === "running" || record.activeRunId !== null || record.activeCommandId !== null; + const unread = !active && (!readCursor || readCursor.sessionVersion < record.version); + const attentionState = active ? "active" : unread ? "unread" : "read"; + return { ...record, sessionPath: `${record.tenantId}/${record.projectId}/${record.sessionId}`, readerId, readCursor, attentionState, unread, active }; +} + +export function sessionMatchesListState(session: SessionSummary, state: SessionListState): boolean { + if (state === "all") return true; + if (state === "default") return session.active || session.unread; + if (state === "running") return session.active; + if (state === "unread") return session.unread; + if (state === "terminal") return session.executionState === "terminal"; + if (state === "idle") return session.executionState === "idle"; + return false; +} + +export function sessionSort(a: SessionSummary, b: SessionSummary): number { + if (a.active !== b.active) return a.active ? -1 : 1; + if (a.unread !== b.unread) return a.unread ? -1 : 1; + return (b.lastActivityAt ?? b.updatedAt).localeCompare(a.lastActivityAt ?? a.updatedAt) || b.updatedAt.localeCompare(a.updatedAt) || a.sessionId.localeCompare(b.sessionId); +} + +export function clampSessionLimit(limit: number): number { + return Math.max(1, Math.min(Number.isFinite(limit) ? Math.trunc(limit) : 50, 100)); +} + +export function parseSessionCursor(cursor: string | undefined): number | null { + if (!cursor) return null; + const value = Number(cursor); + return Number.isInteger(value) && value >= 0 ? value : null; +} + +export function sessionListFilters(input: ListSessionsInput): JsonRecord { + return { state: input.state ?? "default", backendProfile: input.backendProfile ?? null, readerId: input.readerId ?? null, cursor: input.cursor ?? null, limit: clampSessionLimit(input.limit) }; +} + export function summarizeSessionRef(sessionRef: SessionRef | null): JsonRecord | null { if (!sessionRef) return null; return { @@ -510,3 +667,22 @@ export function parseQueueCursor(cursor: string | undefined): number | null { function queueReadKey(taskId: string, readerId: string): string { return `${taskId}:${readerId}`; } + +export function sessionReadKey(sessionId: string, readerId: string): string { + return `${sessionId}:${readerId}`; +} + +export function titleFromMetadata(metadata: JsonRecord): string | null { + const title = metadata.title; + return typeof title === "string" && title.trim().length > 0 ? title.trim().slice(0, 200) : null; +} + +export function sessionTitleFromCommand(command: CommandRecord): string | null { + const value = command.payload.prompt; + if (typeof value !== "string") return null; + return value.trim().replace(/\s+/gu, " ").slice(0, 120) || null; +} + +export function isSessionOutputEvent(event: RunEvent): boolean { + return event.type === "assistant_message" || event.type === "command_output" || event.type === "diff" || event.type === "error" || event.type === "terminal_status"; +} diff --git a/src/selftest/cases/00-redaction-postgres.ts b/src/selftest/cases/00-redaction-postgres.ts index 8f2e136..870461a 100644 --- a/src/selftest/cases/00-redaction-postgres.ts +++ b/src/selftest/cases/00-redaction-postgres.ts @@ -13,7 +13,7 @@ const selfTest: SelfTestCase = async () => { (error) => error instanceof AgentRunError && error.failureKind === "infra-failed" && error.message.includes("DATABASE_URL is required"), ); const postgresContract = postgresMigrationContract(); - assert.equal(postgresContract.latestMigrationId, "005_v01_minimax_m3_backend_profile"); + assert.equal(postgresContract.latestMigrationId, "006_v01_session_control"); assert.equal((postgresContract.checksums as Record)["002_v01_backend_profiles"], "928b5c490cc4539cb64ecef34784557601b2724fa2870570f16a53576804e49c"); assert.ok(Array.isArray(postgresContract.requiredTables)); assert.ok(postgresContract.requiredTables.includes("agentrun_schema_migrations")); diff --git a/src/selftest/cases/65-session-control.ts b/src/selftest/cases/65-session-control.ts new file mode 100644 index 0000000..c0b1e4d --- /dev/null +++ b/src/selftest/cases/65-session-control.ts @@ -0,0 +1,68 @@ +import assert from "node:assert/strict"; +import { startManagerServer } from "../../mgr/server.js"; +import { MemoryAgentRunStore } from "../../mgr/store.js"; +import { ManagerClient } from "../../mgr/client.js"; +import type { CommandRecord, RunEvent, RunRecord, SessionEventPage, SessionListResult, SessionReadCursorRecord, SessionSummary } from "../../common/types.js"; +import type { SelfTestCase } from "../harness.js"; + +const selfTest: SelfTestCase = async (context) => { + const server = await startManagerServer({ port: 0, host: "127.0.0.1", sourceCommit: "self-test", store: new MemoryAgentRunStore() }); + try { + const client = new ManagerClient(server.baseUrl); + const sessionId = "ses_selftest_control"; + const run = await client.post("/api/v1/runs", { + tenantId: "unidesk", + projectId: "pikasTech/agentrun", + workspaceRef: { kind: "host-path", path: context.workspace }, + sessionRef: { sessionId, metadata: { title: "session control self-test" } }, + providerId: "G14", + backendProfile: "codex", + executionPolicy: { + sandbox: "workspace-write", + approval: "never", + timeoutMs: 300000, + network: "default", + secretScope: { allowCredentialEcho: false, providerCredentials: [{ profile: "codex", secretRef: { name: "agentrun-v01-provider-codex", keys: ["auth.json", "config.toml"], mountPath: context.codexHome } }] }, + }, + traceSink: null, + }) as RunRecord; + const command = await client.post(`/api/v1/runs/${run.id}/commands`, { type: "turn", payload: { prompt: "write a concise status" }, idempotencyKey: "session-control-turn" }) as CommandRecord; + + const running = await client.get("/api/v1/sessions?state=running&readerId=self-test") as SessionListResult; + assert.equal(running.count, 1); + assert.equal(running.items[0]?.sessionId, sessionId); + assert.equal(running.items[0]?.active, true); + + await client.post(`/api/v1/runs/${run.id}/events`, { type: "assistant_message", payload: { text: "done" } }) as RunEvent; + await client.patch(`/api/v1/commands/${command.id}/status`, { terminalStatus: "completed", failureKind: null, failureMessage: null, threadId: "thread_selftest", turnId: "turn_selftest" }) as CommandRecord; + + const unread = await client.get("/api/v1/sessions?state=default&readerId=self-test") as SessionListResult; + assert.equal(unread.count, 1); + assert.equal(unread.items[0]?.sessionId, sessionId); + assert.equal(unread.items[0]?.unread, true); + assert.equal(unread.items[0]?.executionState, "terminal"); + + const trace = await client.get(`/api/v1/sessions/${sessionId}/trace?limit=20`) as SessionEventPage; + assert.ok(trace.count >= 3); + assert.equal(trace.runId, run.id); + + const output = await client.get(`/api/v1/sessions/${sessionId}/output?limit=20`) as SessionEventPage; + assert.equal(output.items.some((event) => event.type === "assistant_message"), true); + + const read = await client.post(`/api/v1/sessions/${sessionId}/read`, { readerId: "self-test" }) as SessionReadCursorRecord; + assert.equal(read.sessionId, sessionId); + assert.equal(read.readerId, "self-test"); + + const readDefault = await client.get("/api/v1/sessions?state=default&readerId=self-test") as SessionListResult; + assert.equal(readDefault.count, 0); + + const shown = await client.get(`/api/v1/sessions/${sessionId}?readerId=self-test`) as SessionSummary; + assert.equal(shown.unread, false); + assert.equal(shown.threadId, "thread_selftest"); + return { name: "session-control", tests: ["session-control-rest-memory"] }; + } finally { + await new Promise((resolve) => server.server.close(() => resolve())); + } +}; + +export default selfTest;