fix: 统一 session send 续跑入口

This commit is contained in:
AgentRun Codex
2026-06-11 21:57:29 +08:00
parent 4a5b298a30
commit 64b824911f
6 changed files with 341 additions and 107 deletions
+112 -85
View File
@@ -10,7 +10,7 @@ import { runOnce } from "../../src/runner/run-once.js";
import { renderRunnerJobDryRun } from "../../src/runner/k8s-job.js";
import type { RunnerSessionPvcOptions } from "../../src/runner/k8s-job.js";
import { renderCodexProviderSecretPlan } from "./secret-render.js";
import type { BackendProfile, CommandRecord, JsonRecord, JsonValue, RenderAipodInput, RenderedAipodQueueTask, RunRecord, SessionSummary } from "../../src/common/types.js";
import type { BackendProfile, JsonRecord, JsonValue, RenderAipodInput, RenderedAipodQueueTask, RunRecord } from "../../src/common/types.js";
import { AgentRunError, errorToJson } from "../../src/common/errors.js";
import type { RunnerOnceOptions } from "../../src/runner/run-once.js";
import { backendProfileSpec, isBackendProfile } from "../../src/common/backend-profiles.js";
@@ -81,6 +81,7 @@ async function dispatch(args: ParsedArgs): Promise<CliResult> {
if (group === "sessions" && command === "read" && id) return sessionRead(args, id);
if (group === "sessions" && command === "trace" && id) return sessionEvents(args, id, "trace");
if (group === "sessions" && command === "output" && id) return sessionEvents(args, id, "output");
if (group === "sessions" && command === "send") return sessionSend(args, id ?? null);
if (group === "sessions" && command === "turn") return sessionTurn(args, id ?? null);
if (group === "sessions" && command === "steer" && id) return sessionSteer(args, id);
if (group === "sessions" && command === "cancel" && id) return sessionCancel(args, id);
@@ -676,6 +677,77 @@ function summarizeSessionMutationResult(action: "session-cancel" | "session-read
};
}
function summarizeSessionSendResult(result: JsonValue, sessionId: string, compatibilityAlias: "turn" | "steer" | null, profile: string, aipod?: string): JsonRecord {
const record = jsonRecordValue(result);
const run = jsonRecordValue(record?.run);
const command = jsonRecordValue(record?.command);
const activeBefore = jsonRecordValue(record?.activeBefore);
const runnerJob = jsonRecordValue(record?.runnerJob);
const dryRun = record?.dryRun === true;
const afterSeq = numberValue(jsonRecordValue(record?.supervisor)?.lastSeq) ?? 0;
return {
action: dryRun ? "session-send-plan" : "session-send",
sessionId,
profile,
...(aipod ? { aipod } : {}),
compatibilityAlias,
dryRun,
mutation: record?.mutation === true,
decision: stringValue(record?.decision),
internalCommandType: stringValue(record?.internalCommandType),
activeBefore: activeBefore ? compactRecord(activeBefore, { keys: ["runId", "commandId", "commandState", "runStatus", "leaseExpiresAt", "leaseExpired", "reason"] }) : null,
run: summarizeRunRecord(run),
command: summarizeCommandRecord(command),
runnerJob: runnerJob ? compactRecord(runnerJob, { keys: ["action", "runId", "commandId", "attemptId", "runnerId", "namespace", "jobName", "image", "mutation"] }) : null,
fullResponseBytes: jsonByteLength(result),
valuesPrinted: false,
drillDownCommands: {
show: `./scripts/agentrun sessions show ${sessionId} --reader-id cli`,
trace: `./scripts/agentrun sessions trace ${sessionId} --after-seq ${afterSeq} --limit 100`,
output: `./scripts/agentrun sessions output ${sessionId} --after-seq ${afterSeq} --limit 100`,
read: `./scripts/agentrun sessions read ${sessionId} --reader-id cli`,
cancel: `./scripts/agentrun sessions cancel ${sessionId}`,
},
expandedOutput: {
fullFlag: "--full",
rawFlag: "--raw",
note: dryRun ? "Dry-run is non-mutating; remove --dry-run to send." : "Use --full on the original invocation for the full manager response.",
},
};
}
async function sessionRunnerJobBody(args: ParsedArgs, defaults: JsonRecord = {}): Promise<JsonRecord> {
const runnerOverrides = await optionalRunnerJsonFile(args);
const body = { ...defaults, ...runnerOverrides } as JsonRecord;
copyOptionalFlag(args, body, "image");
copyOptionalFlag(args, body, "namespace");
copyOptionalFlag(args, body, "attempt-id", "attemptId");
copyOptionalFlag(args, body, "runner-id", "runnerId");
copyOptionalFlag(args, body, "source-commit", "sourceCommit");
copyRunnerManagerUrlFlag(args, body);
copyOptionalFlag(args, body, "service-account-name", "serviceAccountName");
const runnerIdempotencyKey = optionalFlag(args, "runner-idempotency-key");
if (runnerIdempotencyKey) body.idempotencyKey = runnerIdempotencyKey;
return body;
}
async function ensureSessionForSend(args: ParsedArgs, sessionId: string, tenantId: string, projectId: string, profile: string): Promise<void> {
try {
await client(args).get(`/api/v1/sessions/${encodeURIComponent(sessionId)}/storage`);
return;
} catch (error) {
if (!(error instanceof AgentRunError) || error.httpStatus !== 404) throw error;
}
const expiresInDays = Number(optionalFlag(args, "expires-in-days") ?? 30);
await client(args).post("/api/v1/sessions", {
sessionId,
tenantId,
projectId,
backendProfile: profile,
expiresAt: new Date(Date.now() + Math.max(1, expiresInDays) * 24 * 60 * 60 * 1000).toISOString(),
});
}
interface QueueSummaryOptions {
limit: number;
}
@@ -1145,7 +1217,7 @@ async function sessionCreate(args: ParsedArgs, positionalSessionId: string | nul
show: `./scripts/agentrun sessions show ${sessionId} --reader-id cli`,
storage: `./scripts/agentrun sessions storage ${sessionId}`,
trace: `./scripts/agentrun sessions trace ${sessionId} --after-seq 0 --limit 100`,
turn: `./scripts/agentrun sessions turn ${sessionId} --prompt "..."`,
send: `./scripts/agentrun sessions send ${sessionId} --prompt "..."`,
},
};
}
@@ -1159,26 +1231,20 @@ async function sessionStorageDelete(args: ParsedArgs, sessionId: string): Promis
}
async function sessionTurn(args: ParsedArgs, positionalSessionId: string | null): Promise<JsonRecord> {
return sessionSend(args, positionalSessionId, { compatibilityAlias: "turn" });
}
async function sessionSteer(args: ParsedArgs, sessionId: string): Promise<JsonRecord> {
return sessionSend(args, sessionId, { compatibilityAlias: "steer" });
}
async function sessionSend(args: ParsedArgs, positionalSessionId: string | null, options: { compatibilityAlias?: "turn" | "steer" } = {}): Promise<JsonRecord> {
const aipod = optionalFlag(args, "aipod") ?? optionalFlag(args, "aipod-spec");
if (aipod) return sessionTurnWithAipod(args, positionalSessionId, aipod);
if (aipod) return sessionSendWithAipod(args, positionalSessionId, aipod, options);
const body = await optionalJsonFile(args);
const sessionId = positionalSessionId ?? optionalFlag(args, "session-id") ?? newSessionId();
const requestedProfile = optionalFlag(args, "profile") ?? optionalFlag(args, "backend-profile") ?? (typeof body.backendProfile === "string" ? String(body.backendProfile) : "codex");
const profile = normalizeProfile(requestedProfile);
if (positionalSessionId || optionalFlag(args, "session-id")) {
try {
await client(args).get(`/api/v1/sessions/${encodeURIComponent(sessionId)}/storage`);
} catch (error) {
const expiresInDays = Number(optionalFlag(args, "expires-in-days") ?? 30);
await client(args).post("/api/v1/sessions", {
sessionId,
tenantId: optionalFlag(args, "tenant-id") ?? "unidesk",
projectId: optionalFlag(args, "project-id") ?? "default",
backendProfile: profile,
expiresAt: new Date(Date.now() + Math.max(1, expiresInDays) * 24 * 60 * 60 * 1000).toISOString(),
});
}
}
const prompt = await readPrompt(args);
body.tenantId = optionalFlag(args, "tenant-id") ?? stringField(body, "tenantId", "unidesk");
body.projectId = optionalFlag(args, "project-id") ?? stringField(body, "projectId", "default");
@@ -1192,39 +1258,20 @@ async function sessionTurn(args: ParsedArgs, positionalSessionId: string | null)
const title = optionalFlag(args, "title");
if (title) sessionMetadata.title = title;
body.sessionRef = { ...sessionRef, sessionId, metadata: sessionMetadata };
const run = await client(args).post("/api/v1/runs", body) as RunRecord;
const commandBody: JsonRecord = { type: "turn", payload: { prompt } };
const runnerBody = await sessionRunnerJobBody(args);
const sendBody: JsonRecord = {
run: body,
payload: { prompt },
createRunnerJob: args.flags.get("no-runner-job") !== true,
runnerJob: runnerBody,
dryRun: args.flags.get("dry-run") === true,
};
const commandIdempotencyKey = optionalFlag(args, "command-idempotency-key") ?? optionalFlag(args, "idempotency-key");
if (commandIdempotencyKey) commandBody.idempotencyKey = commandIdempotencyKey;
const command = await client(args).post(`/api/v1/runs/${encodeURIComponent(run.id)}/commands`, commandBody) as CommandRecord;
let runnerJob: JsonValue = null;
if (args.flags.get("no-runner-job") !== true) {
const runnerBody = await optionalRunnerJsonFile(args);
runnerBody.commandId = command.id;
copyOptionalFlag(args, runnerBody, "image");
copyOptionalFlag(args, runnerBody, "namespace");
copyOptionalFlag(args, runnerBody, "attempt-id", "attemptId");
copyOptionalFlag(args, runnerBody, "runner-id", "runnerId");
copyOptionalFlag(args, runnerBody, "source-commit", "sourceCommit");
copyRunnerManagerUrlFlag(args, runnerBody);
copyOptionalFlag(args, runnerBody, "service-account-name", "serviceAccountName");
const runnerIdempotencyKey = optionalFlag(args, "runner-idempotency-key");
if (runnerIdempotencyKey) runnerBody.idempotencyKey = runnerIdempotencyKey;
runnerJob = await client(args).post(`/api/v1/runs/${encodeURIComponent(run.id)}/runner-jobs`, runnerBody);
}
return { action: "session-turn", sessionId, profile, run, command, runnerJob, pollCommands: { ps: `./scripts/agentrun sessions ps --reader-id cli --profile ${profile}`, show: `./scripts/agentrun sessions show ${sessionId} --reader-id cli`, trace: `./scripts/agentrun sessions trace ${sessionId} --after-seq 0 --limit 100`, output: `./scripts/agentrun sessions output ${sessionId} --after-seq 0 --limit 100`, read: `./scripts/agentrun sessions read ${sessionId} --reader-id cli`, steer: `./scripts/agentrun sessions steer ${sessionId} --prompt-file <file>`, cancel: `./scripts/agentrun sessions cancel ${sessionId}` } };
}
async function sessionSteer(args: ParsedArgs, sessionId: string): Promise<JsonRecord> {
const session = await client(args).get(`/api/v1/sessions/${encodeURIComponent(sessionId)}${readerQuery(args)}`) as SessionSummary;
const runId = session.activeRunId ?? session.lastRunId;
if (!runId) throw new AgentRunError("schema-invalid", `session ${sessionId} has no run to steer`, { httpStatus: 2 });
const prompt = await readPrompt(args);
const body: JsonRecord = { type: "steer", payload: { prompt } };
const idempotencyKey = optionalFlag(args, "idempotency-key");
if (idempotencyKey) body.idempotencyKey = idempotencyKey;
const command = await client(args).post(`/api/v1/runs/${encodeURIComponent(runId)}/commands`, body);
return { action: "session-steer", sessionId, runId, command };
if (commandIdempotencyKey) sendBody.commandIdempotencyKey = commandIdempotencyKey;
if (args.flags.get("dry-run") !== true) await ensureSessionForSend(args, sessionId, body.tenantId as string, body.projectId as string, profile);
const result = await client(args).post(`/api/v1/sessions/${encodeURIComponent(sessionId)}/send`, sendBody);
if (wantsExpandedOutput(args)) return { action: "session-send", compatibilityAlias: options.compatibilityAlias ?? null, result: result as JsonValue, valuesPrinted: false };
return summarizeSessionSendResult(result, sessionId, options.compatibilityAlias ?? null, profile);
}
async function sessionCancel(args: ParsedArgs, sessionId: string): Promise<JsonRecord> {
@@ -1296,25 +1343,11 @@ async function submitQueueTaskWithAipod(args: ParsedArgs, aipod: string): Promis
return client(args).post("/api/v1/queue/tasks", body);
}
async function sessionTurnWithAipod(args: ParsedArgs, positionalSessionId: string | null, aipod: string): Promise<JsonRecord> {
async function sessionSendWithAipod(args: ParsedArgs, positionalSessionId: string | null, aipod: string, options: { compatibilityAlias?: "turn" | "steer" } = {}): Promise<JsonRecord> {
const sessionId = positionalSessionId ?? optionalFlag(args, "session-id") ?? newSessionId();
const rendered = await renderAipodForCommand(args, aipod, positionalSessionId ? 3 : 2, { sessionId });
const task = rendered.queueTask;
const profile = String(task.backendProfile);
if (positionalSessionId || optionalFlag(args, "session-id")) {
try {
await client(args).get(`/api/v1/sessions/${encodeURIComponent(sessionId)}/storage`);
} catch {
const expiresInDays = Number(optionalFlag(args, "expires-in-days") ?? 30);
await client(args).post("/api/v1/sessions", {
sessionId,
tenantId: task.tenantId,
projectId: task.projectId,
backendProfile: task.backendProfile,
expiresAt: new Date(Date.now() + Math.max(1, expiresInDays) * 24 * 60 * 60 * 1000).toISOString(),
});
}
}
const sessionRef = objectField(task as unknown as JsonRecord, "sessionRef", {});
const metadata = objectField(sessionRef, "metadata", {});
const title = optionalFlag(args, "title") ?? task.title;
@@ -1330,26 +1363,21 @@ async function sessionTurnWithAipod(args: ParsedArgs, positionalSessionId: strin
resourceBundleRef: task.resourceBundleRef,
traceSink: { kind: "aipod-session", aipod, sessionId, valuesPrinted: false },
};
const run = await client(args).post("/api/v1/runs", runBody) as RunRecord;
const commandBody: JsonRecord = { type: "turn", payload: task.payload };
const runnerDefaults = jsonRecordValue(rendered.dispatchDefaults.runnerJob) ?? {};
const runnerBody = await sessionRunnerJobBody(args, runnerDefaults);
const sendBody: JsonRecord = {
run: runBody,
payload: task.payload,
createRunnerJob: args.flags.get("no-runner-job") !== true,
runnerJob: runnerBody,
dryRun: args.flags.get("dry-run") === true,
};
const commandIdempotencyKey = optionalFlag(args, "command-idempotency-key") ?? optionalFlag(args, "idempotency-key");
if (commandIdempotencyKey) commandBody.idempotencyKey = commandIdempotencyKey;
const command = await client(args).post(`/api/v1/runs/${encodeURIComponent(run.id)}/commands`, commandBody) as CommandRecord;
let runnerJob: JsonValue = null;
if (args.flags.get("no-runner-job") !== true) {
const runnerDefaults = jsonRecordValue(rendered.dispatchDefaults.runnerJob) ?? {};
const runnerOverrides = await optionalRunnerJsonFile(args);
const runnerBody = { ...runnerDefaults, ...runnerOverrides, commandId: command.id } as JsonRecord;
copyOptionalFlag(args, runnerBody, "image");
copyOptionalFlag(args, runnerBody, "namespace");
copyOptionalFlag(args, runnerBody, "attempt-id", "attemptId");
copyOptionalFlag(args, runnerBody, "runner-id", "runnerId");
copyOptionalFlag(args, runnerBody, "source-commit", "sourceCommit");
copyRunnerManagerUrlFlag(args, runnerBody);
copyOptionalFlag(args, runnerBody, "service-account-name", "serviceAccountName");
runnerJob = await client(args).post(`/api/v1/runs/${encodeURIComponent(run.id)}/runner-jobs`, runnerBody);
}
return { action: "session-turn", aipod, sessionId, profile, run, command, runnerJob, valuesPrinted: false, pollCommands: { ps: `./scripts/agentrun sessions ps --reader-id cli --profile ${profile}`, show: `./scripts/agentrun sessions show ${sessionId} --reader-id cli`, trace: `./scripts/agentrun sessions trace ${sessionId} --after-seq 0 --limit 100`, output: `./scripts/agentrun sessions output ${sessionId} --after-seq 0 --limit 100`, read: `./scripts/agentrun sessions read ${sessionId} --reader-id cli`, steer: `./scripts/agentrun sessions steer ${sessionId} --prompt-file <file>`, cancel: `./scripts/agentrun sessions cancel ${sessionId}` } };
if (commandIdempotencyKey) sendBody.commandIdempotencyKey = commandIdempotencyKey;
if (args.flags.get("dry-run") !== true) await ensureSessionForSend(args, sessionId, String(task.tenantId), String(task.projectId), profile);
const result = await client(args).post(`/api/v1/sessions/${encodeURIComponent(sessionId)}/send`, sendBody);
if (wantsExpandedOutput(args)) return { action: "session-send", compatibilityAlias: options.compatibilityAlias ?? null, aipod, result: result as JsonValue, valuesPrinted: false };
return summarizeSessionSendResult(result, sessionId, options.compatibilityAlias ?? null, profile, aipod);
}
async function submitQueueTask(args: ParsedArgs): Promise<JsonValue> {
@@ -2011,8 +2039,7 @@ function help(args: ParsedArgs, group?: string): JsonRecord {
"sessions storage <sessionId>",
"sessions storage <sessionId> --delete",
"sessions show <sessionId> [--reader-id <reader>]",
"sessions turn [sessionId] [--aipod <name>|--json-stdin|--json-file <run-base.json>] [--prompt-stdin|--prompt-file <file>|--prompt <text>] [--profile codex|deepseek|minimax-m3|dsflash-go|<dynamic-profile>|M3] [--runner-json-stdin|--runner-json-file <job.json>] [--no-runner-job]",
"sessions steer <sessionId> [--prompt-stdin|--prompt-file <file>|--prompt <text>]",
"sessions send [sessionId] [--aipod <name>|--json-stdin|--json-file <run-base.json>] [--prompt-stdin|--prompt-file <file>|--prompt <text>] [--profile codex|deepseek|minimax-m3|dsflash-go|<dynamic-profile>|M3] [--runner-json-stdin|--runner-json-file <job.json>] [--no-runner-job] [--dry-run]",
"sessions cancel <sessionId> [--reason <text>] [--full|--raw]",
"sessions trace <sessionId> [--after-seq <n>] [--limit <n>] [--run-id <runId>] [--summary-chars <n>] [--include-output] [--seq <n>|--event-id <id>|--item-id <id>] [--detail-scan-pages <n>] [--full|--raw]",
"sessions output <sessionId> [--after-seq <n>] [--limit <n>] [--run-id <runId>] [--summary-chars <n>] [--include-output] [--seq <n>|--event-id <id>|--item-id <id>] [--detail-scan-pages <n>] [--full|--raw]",