修复 queue dry-run 与默认摘要输出

This commit is contained in:
Codex
2026-06-10 08:29:57 +08:00
parent 423d841298
commit e0f1a142f4
4 changed files with 302 additions and 40 deletions
+230 -17
View File
@@ -80,13 +80,13 @@ async function dispatch(args: ParsedArgs): Promise<CliResult> {
if (sessionStorageCmd && id) return sessionStorageDelete(args, id);
if (group === "queue" && command === "submit") return submitQueueTask(args);
if (group === "queue" && command === "list") return listQueueTasks(args);
if (group === "queue" && command === "show" && id) return client(args).get(`/api/v1/queue/tasks/${encodeURIComponent(id)}`);
if (group === "queue" && command === "show" && id) return showQueueTask(args, id);
if (group === "queue" && command === "stats") return client(args).get(`/api/v1/queue/stats${queueQuery(args)}`);
if (group === "queue" && command === "commander") return client(args).get(`/api/v1/queue/commander${queueQuery(args, { readerId: true })}`);
if (group === "queue" && command === "read" && id) return client(args).post(`/api/v1/queue/tasks/${encodeURIComponent(id)}/read`, { readerId: optionalFlag(args, "reader-id") ?? "cli" });
if (group === "queue" && command === "cancel" && id) return client(args).post(`/api/v1/queue/tasks/${encodeURIComponent(id)}/cancel`, cancelBody(args));
if (group === "queue" && command === "commander") return queueCommander(args);
if (group === "queue" && command === "read" && id) return readQueueTask(args, id);
if (group === "queue" && command === "cancel" && id) return cancelQueueTask(args, id);
if (group === "queue" && command === "dispatch" && id) return dispatchQueueTask(args, id);
if (group === "queue" && command === "refresh" && id) return client(args).post(`/api/v1/queue/tasks/${encodeURIComponent(id)}/refresh`, {});
if (group === "queue" && command === "refresh" && id) return refreshQueueTask(args, id);
if (group === "runs" && command === "create") return client(args).post("/api/v1/runs", await jsonFile(args));
if (group === "runs" && command === "show" && id) return client(args).get(`/api/v1/runs/${encodeURIComponent(id)}`);
if (group === "runs" && command === "events" && id) return runEvents(args, id);
@@ -420,6 +420,169 @@ function summarizeSessionMutationResult(action: "session-cancel" | "session-read
};
}
interface QueueSummaryOptions {
limit: number;
}
export function summarizeQueueTaskListResult(result: JsonValue, options: QueueSummaryOptions): JsonRecord {
const record = jsonRecordValue(result);
if (!record) throw new AgentRunError("schema-invalid", "queue list response must be an object", { httpStatus: 2 });
const items = queueItems(record.items);
const selected = items.slice(0, options.limit);
return {
action: "queue-list-summary",
queue: stringValue(record.queue),
state: stringValue(record.state),
cursor: stringValue(record.cursor),
nextCursor: stringValue(record.nextCursor),
sourceCount: items.length,
displayedCount: selected.length,
limit: options.limit,
items: selected.map((item) => summarizeQueueTaskWithAttempt(item, stringValue(item.id) ?? "unknown")),
fullResponseBytes: jsonByteLength(result),
valuesPrinted: false,
drillDownCommands: {
full: "./scripts/agentrun queue list --full",
raw: "./scripts/agentrun queue list --raw",
next: stringValue(record.nextCursor) ? `./scripts/agentrun queue list --cursor ${String(record.nextCursor)} --limit ${options.limit}` : null,
},
};
}
export function summarizeQueueTaskShowResult(result: JsonValue, taskId: string): JsonRecord {
const record = jsonRecordValue(result);
if (!record) throw new AgentRunError("schema-invalid", "queue show response must be an object", { httpStatus: 2 });
const sessionId = stringValue(jsonRecordValue(record.sessionRef)?.sessionId) ?? stringValue(jsonRecordValue(record.latestAttempt)?.sessionId);
return {
action: "queue-show-summary",
task: summarizeQueueTaskWithAttempt(record, taskId),
payloadBytes: jsonByteLength(record.payload),
resourceBundleBytes: jsonByteLength(record.resourceBundleRef),
referencesCount: Array.isArray(record.references) ? record.references.length : null,
metadataKeys: Object.keys(jsonRecordValue(record.metadata) ?? {}).sort().slice(0, 24),
fullResponseBytes: jsonByteLength(result),
valuesPrinted: false,
pollCommands: {
full: `./scripts/agentrun queue show ${taskId} --full`,
...(sessionId ? { trace: `./scripts/agentrun sessions trace ${sessionId} --after-seq 0 --limit 100`, output: `./scripts/agentrun sessions output ${sessionId} --after-seq 0 --limit 100` } : {}),
},
};
}
export function summarizeQueueCommanderSnapshot(result: JsonValue, options: QueueSummaryOptions): JsonRecord {
const record = jsonRecordValue(result);
if (!record) throw new AgentRunError("schema-invalid", "queue commander response must be an object", { httpStatus: 2 });
const items = queueItems(record.items);
const selected = items.slice(0, options.limit);
return {
action: "queue-commander-summary",
queue: stringValue(record.queue),
readerId: stringValue(record.readerId),
stats: summarizeQueueStats(jsonRecordValue(record.stats)),
sourceCount: items.length,
displayedCount: selected.length,
limit: options.limit,
unreadCount: items.filter((item) => item.unread === true).length,
activeCount: items.filter((item) => item.active === true || stringValue(item.attentionState) === "active").length,
items: selected.map((item) => summarizeQueueTaskWithAttempt(item, stringValue(item.id) ?? "unknown")),
generatedAt: stringValue(record.generatedAt),
fullResponseBytes: jsonByteLength(result),
valuesPrinted: false,
drillDownCommands: {
full: "./scripts/agentrun queue commander --reader-id cli --full",
raw: "./scripts/agentrun queue commander --reader-id cli --raw",
item: "./scripts/agentrun queue show <taskId>",
trace: "./scripts/agentrun sessions trace <sessionId> --after-seq 0 --limit 100",
output: "./scripts/agentrun sessions output <sessionId> --after-seq 0 --limit 100",
},
};
}
function summarizeQueueTaskMutationResult(action: "queue-read" | "queue-cancel" | "queue-refresh", taskId: string, result: JsonValue, flags: JsonRecord): JsonRecord {
const record = jsonRecordValue(result);
return {
action,
taskId,
mutation: true,
...flags,
task: summarizeQueueTaskWithAttempt(record, taskId),
fullResponseBytes: jsonByteLength(result),
valuesPrinted: false,
drillDownCommands: {
show: `./scripts/agentrun queue show ${taskId}`,
full: `./scripts/agentrun queue show ${taskId} --full`,
},
};
}
function queueMutationDryRunPlan(action: string, taskId: string | null, pathValue: string, body: JsonRecord, method: "POST", confirmCommand: string, task?: JsonValue): JsonRecord {
return {
action: `${action}-plan`,
dryRun: true,
mutation: false,
taskId,
request: {
method,
path: pathValue,
body: summarizeMutationBody(body),
bodyBytes: jsonByteLength(body),
valuesPrinted: false,
},
...(task === undefined ? {} : { task: summarizeQueueTaskWithAttempt(jsonRecordValue(task), taskId ?? stringValue(jsonRecordValue(task)?.id) ?? "unknown") }),
next: {
confirm: confirmCommand,
note: "Remove --dry-run to perform the mutation.",
},
valuesPrinted: false,
};
}
function summarizeMutationBody(body: JsonRecord): JsonRecord {
return {
...compactRecord(body, { keys: ["idempotencyKey", "image", "namespace", "attemptId", "runnerId", "sourceCommit", "managerUrl", "serviceAccountName", "readerId", "reason"] }),
keys: Object.keys(body).sort().slice(0, 32),
payloadBytes: jsonByteLength(body.payload),
executionPolicyBytes: jsonByteLength(body.executionPolicy),
resourceBundleBytes: jsonByteLength(body.resourceBundleRef),
valuesPrinted: false,
};
}
function summarizeQueueStats(record: JsonRecord | null): JsonRecord | null {
if (!record) return null;
return {
queue: stringValue(record.queue),
total: numberValue(record.total),
maxVersion: numberValue(record.maxVersion),
byState: jsonRecordValue(record.byState) ?? {},
byLane: jsonRecordValue(record.byLane) ?? {},
byBackendProfile: jsonRecordValue(record.byBackendProfile) ?? {},
generatedAt: stringValue(record.generatedAt),
fullRecordBytes: jsonByteLength(record),
valuesPrinted: false,
};
}
function summarizeQueueTaskWithAttempt(record: JsonRecord | null, fallbackTaskId: string): JsonRecord {
const summary = summarizeQueueTaskRecord(record, fallbackTaskId);
const latestAttempt = summarizeAttemptRecord(jsonRecordValue(record?.latestAttempt));
const sessionRef = jsonRecordValue(record?.sessionRef);
if (latestAttempt) summary.latestAttempt = latestAttempt;
const sessionId = stringValue(sessionRef?.sessionId) ?? stringValue(latestAttempt?.sessionId);
if (sessionId) summary.sessionId = sessionId;
if (record?.readCursor !== undefined) summary.read = record.readCursor !== null;
return summary;
}
function queueItems(value: JsonValue | undefined): JsonRecord[] {
if (!Array.isArray(value)) throw new AgentRunError("schema-invalid", "queue response.items must be an array", { httpStatus: 2 });
return value.map((item) => {
const record = jsonRecordValue(item);
if (!record) throw new AgentRunError("schema-invalid", "queue response item must be an object", { httpStatus: 2 });
return record;
});
}
function sessionEventDrillDownCommands(kind: "trace" | "output", sessionId: string, options: { afterSeq: number; limit: number; runId: string | null }): JsonRecord {
const base = `./scripts/agentrun sessions ${kind} ${sessionId} --after-seq ${options.afterSeq} --limit ${options.limit}${options.runId ? ` --run-id ${options.runId}` : ""}`;
return {
@@ -432,7 +595,7 @@ function sessionEventDrillDownCommands(kind: "trace" | "output", sessionId: stri
function summarizeQueueTaskRecord(record: JsonRecord | null, fallbackTaskId: string): JsonRecord {
return compactRecord(record, {
fallback: { id: fallbackTaskId },
keys: ["id", "state", "queue", "lane", "title", "priority", "backendProfile", "providerId", "sessionPath", "version", "updatedAt", "cancelledAt", "cancelReason"],
keys: ["id", "state", "queue", "lane", "title", "priority", "backendProfile", "providerId", "sessionPath", "version", "updatedAt", "cancelledAt", "cancelReason", "readerId", "attentionState", "unread", "active"],
});
}
@@ -684,6 +847,9 @@ async function submitQueueTask(args: ParsedArgs): Promise<JsonValue> {
const body = await jsonFile(args);
const idempotencyKey = optionalFlag(args, "idempotency-key");
if (idempotencyKey) body.idempotencyKey = idempotencyKey;
if (args.flags.get("dry-run") === true) {
return queueMutationDryRunPlan("queue-submit", null, "/api/v1/queue/tasks", body, "POST", "./scripts/agentrun queue submit --json-file <task.json>");
}
return client(args).post("/api/v1/queue/tasks", body);
}
@@ -700,7 +866,21 @@ async function listQueueTasks(args: ParsedArgs): Promise<JsonValue> {
if (limit) params.set("limit", limit);
if (updatedAfter) params.set("updatedAfter", updatedAfter);
const query = params.toString();
return client(args).get(`/api/v1/queue/tasks${query ? `?${query}` : ""}`);
const result = await client(args).get(`/api/v1/queue/tasks${query ? `?${query}` : ""}`);
if (wantsExpandedOutput(args)) return result;
return summarizeQueueTaskListResult(result, { limit: integerFlag(args, "limit", 20, { min: 1, max: 100 }) });
}
async function showQueueTask(args: ParsedArgs, taskId: string): Promise<JsonValue> {
const result = await client(args).get(`/api/v1/queue/tasks/${encodeURIComponent(taskId)}`);
if (wantsExpandedOutput(args)) return result;
return summarizeQueueTaskShowResult(result, taskId);
}
async function queueCommander(args: ParsedArgs): Promise<JsonValue> {
const result = await client(args).get(`/api/v1/queue/commander${queueQuery(args, { readerId: true })}`);
if (wantsExpandedOutput(args)) return result;
return summarizeQueueCommanderSnapshot(result, { limit: integerFlag(args, "limit", 20, { min: 1, max: 100 }) });
}
function queueQuery(args: ParsedArgs, options: { readerId?: boolean } = {}): string {
@@ -716,6 +896,17 @@ function queueQuery(args: ParsedArgs, options: { readerId?: boolean } = {}): str
}
async function dispatchQueueTask(args: ParsedArgs, taskId: string): Promise<JsonValue> {
const body = await queueDispatchBody(args);
if (args.flags.get("dry-run") === true) {
const task = await client(args).get(`/api/v1/queue/tasks/${encodeURIComponent(taskId)}`);
return queueMutationDryRunPlan("queue-dispatch", taskId, `/api/v1/queue/tasks/${encodeURIComponent(taskId)}/dispatch`, body, "POST", `./scripts/agentrun queue dispatch ${taskId} --json-file <dispatch.json>`, task);
}
const result = await client(args).post(`/api/v1/queue/tasks/${encodeURIComponent(taskId)}/dispatch`, body);
if (wantsExpandedOutput(args)) return result;
return summarizeQueueDispatchResult(result, taskId);
}
async function queueDispatchBody(args: ParsedArgs): Promise<JsonRecord> {
const body = await optionalJsonFile(args);
const copy = (flagName: string, key = flagName.replace(/-([a-z])/gu, (_, letter: string) => letter.toUpperCase())): void => {
const value = optionalFlag(args, flagName);
@@ -729,9 +920,31 @@ async function dispatchQueueTask(args: ParsedArgs, taskId: string): Promise<Json
copy("source-commit", "sourceCommit");
copyRunnerManagerUrlFlag(args, body);
copy("service-account-name", "serviceAccountName");
const result = await client(args).post(`/api/v1/queue/tasks/${encodeURIComponent(taskId)}/dispatch`, body);
return body;
}
async function readQueueTask(args: ParsedArgs, taskId: string): Promise<JsonValue> {
const body = { readerId: optionalFlag(args, "reader-id") ?? "cli" };
if (args.flags.get("dry-run") === true) return queueMutationDryRunPlan("queue-read", taskId, `/api/v1/queue/tasks/${encodeURIComponent(taskId)}/read`, body, "POST", `./scripts/agentrun queue read ${taskId}`);
const result = await client(args).post(`/api/v1/queue/tasks/${encodeURIComponent(taskId)}/read`, body);
if (wantsExpandedOutput(args)) return result;
return summarizeQueueDispatchResult(result, taskId);
return summarizeQueueTaskMutationResult("queue-read", taskId, result, { read: true });
}
async function cancelQueueTask(args: ParsedArgs, taskId: string): Promise<JsonValue> {
const body = cancelBody(args);
if (args.flags.get("dry-run") === true) return queueMutationDryRunPlan("queue-cancel", taskId, `/api/v1/queue/tasks/${encodeURIComponent(taskId)}/cancel`, body, "POST", `./scripts/agentrun queue cancel ${taskId}${body.reason ? " --reason <text>" : ""}`);
const result = await client(args).post(`/api/v1/queue/tasks/${encodeURIComponent(taskId)}/cancel`, body);
if (wantsExpandedOutput(args)) return result;
return summarizeQueueTaskMutationResult("queue-cancel", taskId, result, { cancelled: true });
}
async function refreshQueueTask(args: ParsedArgs, taskId: string): Promise<JsonValue> {
const body: JsonRecord = {};
if (args.flags.get("dry-run") === true) return queueMutationDryRunPlan("queue-refresh", taskId, `/api/v1/queue/tasks/${encodeURIComponent(taskId)}/refresh`, body, "POST", `./scripts/agentrun queue refresh ${taskId}`);
const result = await client(args).post(`/api/v1/queue/tasks/${encodeURIComponent(taskId)}/refresh`, body);
if (wantsExpandedOutput(args)) return result;
return summarizeQueueTaskMutationResult("queue-refresh", taskId, result, { refreshed: true });
}
async function showRunnerJobStatus(args: ParsedArgs): Promise<JsonValue> {
@@ -1245,15 +1458,15 @@ function help(args: ParsedArgs, group?: string): JsonRecord {
"runner job --dry-run --run-id <runId> --command-id <commandId> --image <image>",
"runner jobs --run-id <runId> [--command-id <commandId>]",
"runner job-status [runnerJobId] --run-id <runId>",
"queue submit --json-file <task.json> [--idempotency-key <key>]",
"queue list [--queue <queue>] [--state <state>] [--cursor <cursor>] [--limit <limit>] [--updated-after <version>]",
"queue show <taskId>",
"queue submit --json-file <task.json> [--idempotency-key <key>] [--dry-run]",
"queue list [--queue <queue>] [--state <state>] [--cursor <cursor>] [--limit <limit>] [--updated-after <version>] [--full|--raw]",
"queue show <taskId> [--full|--raw]",
"queue stats [--queue <queue>]",
"queue commander [--queue <queue>] [--reader-id <reader>]",
"queue read <taskId> [--reader-id <reader>]",
"queue cancel <taskId> [--reason <text>]",
"queue dispatch <taskId> [--json-file <dispatch.json>] [--idempotency-key <key>] [--image <image>] [--namespace <namespace>] [--full|--raw]",
"queue refresh <taskId>",
"queue commander [--queue <queue>] [--reader-id <reader>] [--limit <display-limit>] [--full|--raw]",
"queue read <taskId> [--reader-id <reader>] [--dry-run] [--full|--raw]",
"queue cancel <taskId> [--reason <text>] [--dry-run] [--full|--raw]",
"queue dispatch <taskId> [--json-file <dispatch.json>] [--idempotency-key <key>] [--image <image>] [--namespace <namespace>] [--dry-run] [--full|--raw]",
"queue refresh <taskId> [--dry-run] [--full|--raw]",
"secrets codex render --dry-run [--profile codex|deepseek|minimax-m3|dsflash-go|<dynamic-profile>] [--codex-home <dir>] [--model-catalog-file <file>] [--namespace agentrun-v01] [--secret-name <name>]",
"provider-profiles list",
"provider-profiles show <profile>",