fix(code-queue): cap judge retries and trace judge prompts

This commit is contained in:
lyon
2026-05-15 13:17:10 +08:00
parent 17f6389623
commit 83ad5cac05
6 changed files with 211 additions and 33 deletions
+1
View File
@@ -62,6 +62,7 @@ Tag 是 OA 事件流的订阅和投影索引。所有 tag 必须是稳定字符
- 运行中每个新的 TraceView 可见执行行都必须发布 `trace-step-created`,并带 `task:<taskId>``queue:<queueId>``attempt:<index>``service:code-queue``trace` tag,以及 payload 中的 `scopeId=task:<taskId>``attemptIndex``attemptScopeId=task:<taskId>:attempt:<index>`;统计中心据此幂等更新任务级累计统计和 attempt 级独立统计,message/error 行增加 STEP 或 errorsystem 行默认仅保留在任务原始输出/数据库中,不进入 STEP 计数且不伪装为工具调用。
- Trace Summary 顶部执行摘要读取任务级 `task:<taskId>`;执行过程摘要 `#<index>` 读取 `task:<taskId>:attempt:<index>`。如果 attempt scope 尚未投影完成,必须显示 `statsSource=unavailable``--`,不得回退到任务级累计统计、transcript 重算或旧本地字段。
- 任务入队、开始、终态、移动 queue、标记已读等状态事实必须发布 `task-updated` 或更具体的事实事件,供事件表和后续审计使用。
- Code Queue judge 每次真正发起 MiniMax LLM 请求前必须发布 `judge-llm-request` 诊断事件,tag 至少包含 `service:code-queue``task:<taskId>``attempt:<index>``judge``diagnostics`payload 必须包含不带 API key 的最终 request payload/messages、prompt/payload 尺寸和 repairAttempt;随后发布 `judge-llm-response``judge-json-parse-error`(如有)和 `judge-result`,用于追溯“最终发给 judge LLM 的 prompt”和安全覆盖结果。
- Code Queue 服务启动后可对 PostgreSQL 中已有任务回放每个 TraceView 可见执行行的 `trace-step-created`,并发布 `trace-stats-snapshot` 事件完成统计中心种子同步;回放必须使用相同 `eventId` 保持幂等,不得阻塞队列恢复。历史回放必须按 attempt start 行推导 `attemptIndex`,确保重建投影时 attempt scope 可独立恢复。
## Pipeline Integration
@@ -26,7 +26,7 @@ services:
CODE_QUEUE_MODEL_REASONING_EFFORTS: "${CODE_QUEUE_MODEL_REASONING_EFFORTS:-gpt-5.5=xhigh}"
CODE_QUEUE_SANDBOX: "${CODE_QUEUE_SANDBOX:-danger-full-access}"
CODE_QUEUE_APPROVAL_POLICY: "${CODE_QUEUE_APPROVAL_POLICY:-never}"
CODE_QUEUE_MAX_ATTEMPTS: "${CODE_QUEUE_MAX_ATTEMPTS:-99}"
CODE_QUEUE_MAX_ATTEMPTS: "${CODE_QUEUE_MAX_ATTEMPTS:-10}"
CODE_QUEUE_MAX_ACTIVE_QUEUES: "${CODE_QUEUE_MAX_ACTIVE_QUEUES:-0}"
CODE_QUEUE_DATABASE_POOL_MAX: "${CODE_QUEUE_DATABASE_POOL_MAX:-2}"
NODE_OPTIONS: "${CODE_QUEUE_NODE_OPTIONS:---max-old-space-size=1024}"
@@ -116,6 +116,7 @@ import { ReferenceTaskLookupError, configureReferences, injectReferencedTaskCont
import {
applyOaTraceStatsToTaskJson,
configureOaEvents,
publishCodeQueueJudgeEvent,
publishCodeQueueQueueUpdated,
publishCodeQueueTaskUpdated,
publishCodeQueueTraceStatsSnapshot,
@@ -159,7 +160,7 @@ const serviceStartedAt = new Date().toISOString();
const defaultQueueId = "default";
const judgeFailRetryLimit = 3;
const fallbackJudgeRetryLimit = 3;
const maxTaskAttempts = 99;
const maxTaskAttempts = 10;
const referenceInjectionMaxRounds: number | null = null;
const retryBackoffBaseMs = 1000;
const retryBackoffMaxMs = 10 * 60 * 1000;
@@ -238,6 +239,19 @@ function envNonNegativeNumber(name: string, fallback: number): number {
return Number.isFinite(value) && value >= 0 ? Math.floor(value) : fallback;
}
function clampTaskAttempts(value: number): number {
const parsed = Number(value);
if (!Number.isFinite(parsed)) return maxTaskAttempts;
return Math.max(1, Math.min(maxTaskAttempts, Math.floor(parsed)));
}
function reserveNextAttemptBudget(task: QueueTask): boolean {
const nextAttempt = task.attempts.length + 1;
if (nextAttempt > maxTaskAttempts) return false;
task.maxAttempts = clampTaskAttempts(Math.max(task.maxAttempts, nextAttempt));
return true;
}
function envBool(name: string, fallback: boolean): boolean {
const raw = process.env[name];
if (raw === undefined || raw.trim().length === 0) return fallback;
@@ -328,7 +342,7 @@ function readConfig(): RuntimeConfig {
modelReasoningEfforts: envModelReasoningEfforts("CODE_QUEUE_MODEL_REASONING_EFFORTS", { "gpt-5.5": "xhigh" }),
sandbox: sandboxValue(envString("CODE_QUEUE_SANDBOX", "danger-full-access")),
approvalPolicy: approvalValue(envString("CODE_QUEUE_APPROVAL_POLICY", "never")),
defaultMaxAttempts: Math.max(1, Math.min(maxTaskAttempts, envNumber("CODE_QUEUE_MAX_ATTEMPTS", maxTaskAttempts))),
defaultMaxAttempts: clampTaskAttempts(envNumber("CODE_QUEUE_MAX_ATTEMPTS", maxTaskAttempts)),
codeModels,
minimaxApiKey: envString("MINIMAX_API_KEY", ""),
minimaxApiBase: envString("MINIMAX_API_BASE", "https://api.minimaxi.com/v1").replace(/\/+$/u, ""),
@@ -840,6 +854,7 @@ function normalizeTask(task: QueueTask): QueueTask {
task.executionMode = normalizeCodeExecutionMode(task.executionMode);
task.cwd = resolveTaskCwd(task.providerId, task.cwd);
task.reasoningEffort = resolveReasoningEffort(task.model, task.reasoningEffort);
task.maxAttempts = clampTaskAttempts(task.maxAttempts || config.defaultMaxAttempts);
task.basePrompt ||= userPromptForDisplay(task.prompt);
task.referenceTaskIds ??= referenceTaskIdsFromPrompt(task.prompt);
task.referenceInjection ??= null;
@@ -1785,7 +1800,7 @@ function normalizeRequest(value: unknown): QueueTaskRequest {
if (typeof record.model === "string" && record.model.length > 0) request.model = record.model;
if (typeof record.reasoningEffort === "string" && record.reasoningEffort.length > 0) request.reasoningEffort = record.reasoningEffort;
if (typeof record.executionMode === "string" && record.executionMode.length > 0) request.executionMode = normalizeCodeExecutionMode(record.executionMode);
if (typeof record.maxAttempts === "number" && Number.isInteger(record.maxAttempts) && record.maxAttempts > 0) request.maxAttempts = Math.min(maxTaskAttempts, record.maxAttempts);
if (typeof record.maxAttempts === "number" && Number.isInteger(record.maxAttempts) && record.maxAttempts > 0) request.maxAttempts = clampTaskAttempts(record.maxAttempts);
const referenceTaskIds = collectReferenceTaskIds(record, record.prompt);
if (referenceTaskIds.length > 0) request.referenceTaskIds = referenceTaskIds;
return request;
@@ -2178,6 +2193,7 @@ configureJudge({
extractString,
promptLineCount,
judgeFailRetryLimit,
publishJudgeEvent: (task, type, payload) => publishCodeQueueJudgeEvent(task, queueIdOf(task), type, payload),
});
configureCodexPort({
@@ -2648,7 +2664,15 @@ function queueActiveTasksForRestartRetry(reason: string, method: string): number
task.lastError = reason;
task.nextMode = "retry";
task.nextPrompt = queueRecoveryRetryPrompt(task, reason);
task.maxAttempts = Math.max(task.maxAttempts, maxTaskAttempts, task.attempts.length + 1);
if (!reserveNextAttemptBudget(task)) {
task.status = "failed";
task.finishedAt = nowIso();
task.nextMode = null;
task.nextPrompt = null;
task.lastError = `Max attempts reached (${maxTaskAttempts}) before restart recovery. ${reason}`;
appendOutput(task, "error", `${task.lastError}\n`, method);
continue;
}
setAttemptFeedbackPrompt(task.attempts.at(-1), task.nextPrompt, "queue-recovery-retry", task.attempts.length + 1);
task.updatedAt = nowIso();
appendOutput(task, "system", `${reason}; task queued for retry\n`, method);
@@ -2686,7 +2710,7 @@ async function runTask(task: QueueTask): Promise<void> {
return;
}
armIdleNotification();
task.maxAttempts = Math.max(task.maxAttempts, maxTaskAttempts);
task.maxAttempts = clampTaskAttempts(task.maxAttempts || config.defaultMaxAttempts);
task.startedAt ??= nowIso();
task.lastError = null;
while (task.attempts.length < task.maxAttempts && !task.cancelRequested && !shutdownRequested) {
@@ -2754,14 +2778,13 @@ async function runTask(task: QueueTask): Promise<void> {
}
if (judge.decision === "fail") {
task.judgeFailCount += 1;
if (!explicitUserInterrupt(task, result) && task.judgeFailCount < judgeFailRetryLimit) {
if (!explicitUserInterrupt(task, result) && task.judgeFailCount < judgeFailRetryLimit && reserveNextAttemptBudget(task)) {
task.status = "retry_wait";
task.finishedAt = null;
task.readAt = null;
const nextPrompt = judgeFailContinuationPrompt(task, judge, task.judgeFailCount);
task.nextPrompt = nextPrompt;
task.nextMode = "retry";
task.maxAttempts = Math.max(task.maxAttempts, task.attempts.length + 1);
setAttemptFeedbackPrompt(latestAttempt, nextPrompt, "judge-fail-retry", task.attempts.length + 1);
task.updatedAt = nowIso();
appendOutput(task, "system", `judge=fail treated as retry (${task.judgeFailCount}/${judgeFailRetryLimit}); appending continuation prompt to existing session\n`, "queue");
@@ -2793,6 +2816,7 @@ async function runTask(task: QueueTask): Promise<void> {
failTaskForFallbackRetryLimit(task, judge);
return;
}
if (task.attempts.length >= task.maxAttempts) break;
task.status = "retry_wait";
task.finishedAt = null;
task.readAt = null;
@@ -3351,7 +3375,7 @@ async function manualRetry(task: QueueTask, req: Request): Promise<Response> {
task.readAt = null;
task.cancelRequested = false;
task.lastError = null;
task.maxAttempts = Math.max(task.maxAttempts, maxTaskAttempts, task.attempts.length + 1);
task.maxAttempts = Math.max(task.maxAttempts, task.attempts.length + 1);
task.nextMode = "retry";
task.nextPrompt = explicitPrompt.length > 0 ? explicitPrompt : retryPrompt(task, { decision: "retry", confidence: 1, reason: "Manual retry", source: "fallback" });
setAttemptFeedbackPrompt(task.attempts.at(-1), task.nextPrompt, explicitPrompt.length > 0 ? "manual-retry-explicit" : "manual-retry-generated", task.attempts.length + 1);
@@ -212,7 +212,7 @@ export const defaultJudgeProbeCases: JudgeProbeCase[] = [
},
{
channel: "system",
text: "attempt 2/99 queue=constar provider=main-server cwd=/root/unidesk mode=retry model=gpt-5.5\n",
text: "attempt 2/10 queue=constar provider=main-server cwd=/root/unidesk mode=retry model=gpt-5.5\n",
method: "queue",
},
{
@@ -352,6 +352,32 @@ export const defaultJudgeProbeCases: JudgeProbeCase[] = [
],
events: [{ at: nowIso(), method: "turn/completed", status: "completed" }],
},
{
id: "code_queue_no_build_recreate_live_verified_should_complete",
prompt: "修复 codex_1778762250800_82e71d 展开执行过程摘要 #1 只有 5 个 STEP 的问题;本地测试 OK 后部署到 D601 Code Queue,并验证 remote/frontdoor 能看到完整 STEP。",
finalResponse: "已修复 Code Queue trace-steps 的 OA Event Flow 分页读取问题,并完成上线验证。\n\n部署结果:先执行 `docker compose -f src/components/microservices/code-queue/docker-compose.d601.yml up -d --build --force-recreate code-queue`,后续一次 build 受 Docker Hub mirror EOF 影响失败;为完成上线,已把当前修复同步进运行容器、commit 成 `unidesk-code-queue:d601`,并执行 `docker compose -f src/components/microservices/code-queue/docker-compose.d601.yml up -d --force-recreate code-queue`,容器 `code-queue-backend` 已 Startedstatus=runninghealth=ok。\n\nLive VerificationD601 本机 `/api/tasks/codex_1778762250800_82e71d/trace-steps?afterSeq=0&limit=500&attempt=1` 返回 `source=oa-event-flow,total=153,returned=153,hasMore=false`main server microservice proxy 返回 httpStatus=200 且 total=153;公网 served frontend direct API `/api/code-queue-direct/.../trace-steps?afterSeq=0&limit=500&attempt=1` 返回 source=oa-event-flow,total=153`/trace-summary` 返回 stepCount=153, statsSource=oa-event-flow。\n\nDockerfile 已补齐依赖,后续重建可直接跑;建议后续修 Docker daemon registry mirror,避免 rebuild 偶发 EOF。",
expected: "complete",
terminalStatus: "completed",
outputs: [
{
channel: "user",
text: "修复 codex_1778762250800_82e71d 展开执行过程摘要 #1 只有 5 个 STEP 的问题;本地测试 OK 后部署到 D601 Code Queue,并验证 remote/frontdoor 能看到完整 STEP。\n",
method: "enqueue",
},
{ channel: "diff", text: "item/completed: file changes status=completed; M src/components/microservices/code-queue/src/oa-events.ts M src/components/microservices/code-queue/src/task-view.ts M src/components/microservices/code-queue/Dockerfile", method: "item/completed" },
{ channel: "command", text: "item/completed: bun run --cwd src/components/microservices/code-queue check status=completed; ok=true", method: "item/completed" },
{ channel: "command", text: "item/completed: docker compose up -d --build --force-recreate code-queue status=failed; Docker Hub mirror EOF", method: "item/completed" },
{ channel: "command", text: "item/completed: docker commit code-queue-backend unidesk-code-queue:d601 && docker compose up -d --force-recreate code-queue status=completed; code-queue-backend Started running healthy", method: "item/completed" },
{ channel: "command", text: "item/completed: trace-steps?afterSeq=0&limit=500&attempt=1 status=completed; source=oa-event-flow total=153 returned=153 hasMore=false", method: "item/completed" },
{
channel: "assistant",
text: "已修复并上线。build 曾因 Docker Hub mirror EOF 失败,但已通过 docker commit + no-build force-recreate 挂载到运行中 code-queue-backendlive API、microservice proxy 和公网 served frontend direct API 都返回 source=oa-event-flow,total=153,returned=153。Dockerfile 已补齐依赖,后续重建可直接跑;建议后续修 registry mirror。",
method: "item/agentMessage/delta",
},
{ channel: "system", text: "turn completed status=completed\n", method: "turn/completed" },
],
events: [{ at: nowIso(), method: "turn/completed", status: "completed" }],
},
{
id: "transport_closed_before_terminal",
prompt: "Refactor the queue worker and run the focused tests.",
@@ -32,6 +32,7 @@ export interface JudgeRuntimeContext {
extractString: (value: unknown, key: string) => string | null;
promptLineCount: (text: string) => number;
judgeFailRetryLimit: number;
publishJudgeEvent?: (task: QueueTask, type: string, payload: Record<string, JsonValue>) => void;
}
let context: JudgeRuntimeContext | null = null;
@@ -53,6 +54,16 @@ function logger(level: "debug" | "info" | "warn" | "error", message: string, dat
ctx().logger(level, message, data);
}
function publishJudgeEvent(task: QueueTask, type: string, payload: Record<string, JsonValue>): void {
const publisher = ctx().publishJudgeEvent;
if (publisher === undefined) return;
try {
publisher(task, type, payload);
} catch (error) {
logger("warn", "judge_oa_event_publish_failed", { taskId: task.id, type, error: error instanceof Error ? error.message : String(error) });
}
}
function safePreview(value: string, max?: number): string {
return ctx().safePreview(value, max);
}
@@ -302,6 +313,7 @@ function judgePrompt(task: QueueTask, result: CodexRunResult): string {
"对于 frontend 或 WebUI 可见变更,源码编辑和 type check 不够。complete 需要证明已重建/重启或刷新已服务的 frontend bundle;当用户可见行为是验收目标时,还要有针对运行中 UniDesk frontend 的 browser/E2E/UI 验证。",
"如果 frontend/UI 任务的最终回复说 public/full E2E 未运行,或 transcript 缺少 frontend rebuild/server rebuild 加 served-UI 验证,而请求变更可能在已部署 UI bundle 中不可见,则选择 retry。",
"对于 Code Queue、backend-core、provider-gateway、frontend 或任何其他 UniDesk service/runtime 行为变更,源码编辑、TypeScript 检查和本地构建不够。complete 需要证明每个受影响的运行中 service/container/bundle 已重建或重启,并且部署后的 live API/UI 行为已验证。",
"如果最终回复同时给出已执行的容器 recreate/restart、运行中 service/image/started/healthy 证据和 live API/UI 验证,且只把 Docker mirror/build 可复现性、后续常规重建或 registry 修复作为建议,不要仅因出现“后续重建/建议修 mirror”等字样改判 retry。",
"对于 Code Queue 自身的 runtime 行为变更,不得把“等待当前 Code Queue task 结束/等待自己退出后再重启”当作完成计划或阻塞理由;这是自锁。应选择 retry,反馈要求直接重启/重建 Code Queue 并在 restart-recovery 后验证 live health/task 证据。",
"如果用户要求功能在 WebUI 可见或“上线/生效/提供展示”,不要把 rebuild/restart/deploy 当成建议。若没有运行中服务或已服务浏览器 UI 的证据,选择 retry,并要求部署加验证。",
"如果最终回复说它停下来要求用户确认如何处理一个 unexpected/concurrently modified 文件,而该文件不在 agent 必要交付范围内,则选择 retry:任务尚未自主交付完成。",
@@ -675,16 +687,36 @@ function messagePromptStats(messages: Array<{ role: string; content: string }>):
}, { promptChars: 0, promptLines: 0 });
}
export function judgeTaskInputDiagnostics(task: QueueTask, result: CodexRunResult, includePrompt = false): Record<string, JsonValue> {
const judgePromptContent = judgePrompt(task, result);
const messages = miniMaxJudgeMessages(judgePromptContent, null);
const promptStats = messagePromptStats(messages);
const body = JSON.stringify({
function miniMaxJudgeRequestBody(messages: Array<{ role: "system" | "user" | "assistant"; content: string }>): {
payload: {
model: string;
temperature: number;
max_tokens: number;
messages: Array<{ role: "system" | "user" | "assistant"; content: string }>;
};
body: string;
promptStats: { promptChars: number; promptLines: number };
payloadBytes: number;
} {
const payload = {
model: config().minimaxModel,
temperature: 0,
max_tokens: config().judgeMaxTokens,
messages,
});
};
const body = JSON.stringify(payload);
return {
payload,
body,
promptStats: messagePromptStats(messages),
payloadBytes: encodedBytes(body),
};
}
export function judgeTaskInputDiagnostics(task: QueueTask, result: CodexRunResult, includePrompt = false): Record<string, JsonValue> {
const judgePromptContent = judgePrompt(task, result);
const messages = miniMaxJudgeMessages(judgePromptContent, null);
const request = miniMaxJudgeRequestBody(messages);
return {
provider: "minimax",
model: config().minimaxModel,
@@ -696,9 +728,9 @@ export function judgeTaskInputDiagnostics(task: QueueTask, result: CodexRunResul
messageCount: messages.length,
judgePromptChars: judgePromptContent.length,
judgePromptLines: promptLineCount(judgePromptContent),
promptChars: promptStats.promptChars,
promptLines: promptStats.promptLines,
payloadBytes: encodedBytes(body),
promptChars: request.promptStats.promptChars,
promptLines: request.promptStats.promptLines,
payloadBytes: request.payloadBytes,
promptPreview: safePreview(judgePromptContent, 1200),
...(includePrompt ? { prompt: judgePromptContent } : {}),
};
@@ -721,14 +753,21 @@ function needsRuntimeDeploymentEvidence(text: string): boolean {
function lineAdmitsMissingDeployment(line: string): boolean {
const normalized = line.trim();
if (normalized.length === 0) return false;
if (/(judge feedback|judge | judge||||||.*|(?:)?(?:|线||)|succeeded|healthy|verified|deployed|rebuilt)/iu.test(normalized)) return false;
return /(||(?:|)(?:||)?[^\n]{0,50}(?:server rebuild|rebuild|||线|live verification||UI |)|(?:||||)[^\n]{0,80}(?:线|||)|rebuild\/deploy as advisory|treats? rebuild\/deploy as advisory|\b(?:not|never|no)\s+(?:rebuilt?|deployed?|restarted?|verified?))/iu.test(normalized);
if (/(judge feedback|judge | judge||||||.*|(?:)?(?:|线||||)|succeeded|healthy|verified|deployed|rebuilt|force-recreate|running|live verification|)/iu.test(normalized)) return false;
const directAdmission = /(||(?:|)(?:||)?[^\n]{0,50}(?:server rebuild|rebuild|||线|live verification||UI |)|rebuild\/deploy as advisory|treats? rebuild\/deploy as advisory|\b(?:not|never|no)\s+(?:rebuilt?|deployed?|restarted?|verified?))/iu;
const futureAdmission = /(?:||||)[^\n]{0,80}(?:||||||)[^\n]{0,40}(?:线|||)|(?:||||)[^\n]{0,40}(?:线|||)[^\n]{0,60}(?:||||||)/iu;
return directAdmission.test(normalized) || futureAdmission.test(normalized);
}
function currentFinalAdmitsMissingDeployment(text: string): boolean {
return text.split(/\r?\n/u).some(lineAdmitsMissingDeployment);
}
function currentFinalHasRuntimeDeploymentEvidence(text: string): boolean {
const normalized = text.replace(/\s+/gu, " ");
return /(docker compose[^]*\bup\s+-d[^]*(?:Started|healthy|running||Recreated|Started)|force-recreate[^]*(?:Started|running||recreated|healthy)|docker commit[^]*(?:unidesk-code-queue|code-queue)|server rebuild frontend[^]*(?:succeeded||healthy)||Live Verification|live (?:API|UI)[^]*(?:||200|ok)| served frontend|microservice (?:proxy|status)[^]*(?:200||ok|running)|trace-summary[^]*(?:|stepCount|statsSource)|trace-steps[^]*(?:total|returned|source=oa-event-flow)|status[:=][^]*(?:running|healthy)|health[:=][^]*(?:ok|healthy)|source=oa-event-flow[^]*(?:total|returned|stepCount)|started[:=][^]*\d{4}-\d{2}-\d{2})/iu.test(normalized);
}
function asksToConfirmConcurrentFileInsteadOfDelivery(text: string): boolean {
const normalized = text.replace(/\s+/gu, " ");
const asksForConfirmation = /(|[^.!?]*||please confirm (?:how|what|whether)|I need to stop[^.?!]*confirm)/iu.test(normalized);
@@ -849,7 +888,7 @@ function applyJudgeSafetyOverrides(task: QueueTask, result: CodexRunResult, judg
}
if (judge.decision !== "complete") return judge;
const scopeText = judgeScopeText(task, result);
if (!needsRuntimeDeploymentEvidence(scopeText) || !currentFinalAdmitsMissingDeployment(currentFinalText)) return judge;
if (!needsRuntimeDeploymentEvidence(scopeText) || !currentFinalAdmitsMissingDeployment(currentFinalText) || currentFinalHasRuntimeDeploymentEvidence(currentFinalText)) return judge;
const reason = "最终回复承认 runtime/UI/service 变更尚未部署到运行中服务或已服务 UI;只有源码编辑和检查还不完整。";
return {
...judge,
@@ -862,7 +901,17 @@ function applyJudgeSafetyOverrides(task: QueueTask, result: CodexRunResult, judg
}
export async function judgeTask(task: QueueTask, result: CodexRunResult): Promise<JudgeResult> {
if (config().minimaxApiKey.length === 0) return applyJudgeSafetyOverrides(task, result, fallbackJudge(result));
if (config().minimaxApiKey.length === 0) {
const judge = applyJudgeSafetyOverrides(task, result, fallbackJudge(result));
publishJudgeEvent(task, "judge-result", {
taskId: task.id,
attempt: task.currentAttempt,
provider: "fallback",
configured: false,
judge: judge as unknown as JsonValue,
});
return judge;
}
const judgePromptContent = judgePrompt(task, result);
const basePromptStats = { promptChars: judgePromptContent.length, promptLines: promptLineCount(judgePromptContent) };
try {
@@ -870,7 +919,38 @@ export async function judgeTask(task: QueueTask, result: CodexRunResult): Promis
let repairContext: JudgeRepairContext | null = null;
for (let repairAttempt = 0; repairAttempt <= config().judgeRepairAttempts; repairAttempt += 1) {
const messages = miniMaxJudgeMessages(judgePromptContent, repairContext);
const request = miniMaxJudgeRequestBody(messages);
publishJudgeEvent(task, "judge-llm-request", {
taskId: task.id,
attempt: task.currentAttempt,
repairAttempt,
requestKind: repairContext === null ? "initial" : "repair",
provider: "minimax",
model: config().minimaxModel,
apiBase: config().minimaxApiBase,
timeoutMs: config().judgeTimeoutMs,
maxTokens: config().judgeMaxTokens,
messageCount: messages.length,
judgePromptChars: judgePromptContent.length,
judgePromptLines: promptLineCount(judgePromptContent),
promptChars: request.promptStats.promptChars,
promptLines: request.promptStats.promptLines,
payloadBytes: request.payloadBytes,
// This is the exact request payload sent to MiniMax, minus headers/API key.
requestPayload: request.payload as unknown as JsonValue,
});
const response = await requestMiniMaxJudge(messages, repairAttempt);
publishJudgeEvent(task, "judge-llm-response", {
taskId: task.id,
attempt: task.currentAttempt,
repairAttempt,
provider: "minimax",
diagnostics: response.diagnostics as unknown as JsonValue,
responseContent: response.content,
responseContentPreview: safePreview(response.content, 2000),
responseRawText: response.rawText,
responseRawTextPreview: safePreview(response.rawText, 2000),
});
const preDenoiseContent = response.content;
try {
const parsedResult = parseJudgeJson(preDenoiseContent);
@@ -894,9 +974,28 @@ export async function judgeTask(task: QueueTask, result: CodexRunResult): Promis
_request: response.diagnostics as unknown as JsonValue,
},
};
return applyJudgeSafetyOverrides(task, result, judge);
const finalJudge = applyJudgeSafetyOverrides(task, result, judge);
publishJudgeEvent(task, "judge-result", {
taskId: task.id,
attempt: task.currentAttempt,
repairAttempt,
provider: "minimax",
parseSource: parsedResult.source,
minimaxJudge: judge as unknown as JsonValue,
judge: finalJudge as unknown as JsonValue,
safetyOverride: extractString(extractRecord(finalJudge.raw), "_safetyOverride") ?? null,
});
return finalJudge;
} catch (error) {
lastParseError = error instanceof Error ? error.message : String(error);
publishJudgeEvent(task, "judge-json-parse-error", {
taskId: task.id,
attempt: task.currentAttempt,
repairAttempt,
provider: "minimax",
error: safePreview(lastParseError, 1200),
preDenoiseResponsePreview: safePreview(preDenoiseContent, 2000),
});
if (repairAttempt >= config().judgeRepairAttempts) {
throw new MiniMaxJudgeFailure(minimaxFailureDetails({
stage: /continuePrompt exceeds source budget/iu.test(lastParseError) ? "validation" : "parse",
@@ -943,7 +1042,17 @@ export async function judgeTask(task: QueueTask, result: CodexRunResult): Promis
repairAttempt: detail.repairAttempt,
maxRepairAttempts: detail.maxRepairAttempts,
} as unknown as JsonValue);
return applyJudgeSafetyOverrides(task, result, fallbackJudge(result, detail));
const judge = applyJudgeSafetyOverrides(task, result, fallbackJudge(result, detail));
publishJudgeEvent(task, "judge-result", {
taskId: task.id,
attempt: task.currentAttempt,
provider: "fallback",
configured: true,
minimaxFailure: detail as unknown as JsonValue,
judge: judge as unknown as JsonValue,
safetyOverride: extractString(extractRecord(judge.raw), "_safetyOverride") ?? null,
});
return judge;
}
}
@@ -951,14 +1060,8 @@ async function requestMiniMaxJudge(messages: Array<{ role: "system" | "user" | "
const controller = new AbortController();
let timedOut = false;
const startedAt = Date.now();
const promptStats = messagePromptStats(messages);
const body = JSON.stringify({
model: config().minimaxModel,
temperature: 0,
max_tokens: config().judgeMaxTokens,
messages,
});
const payloadBytes = encodedBytes(body);
const request = miniMaxJudgeRequestBody(messages);
const { body, promptStats, payloadBytes } = request;
const timer = setTimeout(() => {
timedOut = true;
controller.abort();
@@ -321,6 +321,30 @@ export function publishCodeQueueQueueUpdated(queueId: string, reason: string): v
});
}
export function publishCodeQueueJudgeEvent(task: QueueTask, queueId: string, type: string, payload: JsonRecord): void {
const at = ctx().nowIso();
const eventType = type.trim() || "judge-diagnostics";
const attempt = positiveInteger(payload.attempt) ?? (task.currentAttempt || task.attempts.length || 0);
postOaEvent({
eventId: `code-queue:${eventType}:${task.id}:${attempt}:${payload.repairAttempt ?? "na"}:${at}:${hash(payload)}`,
type: eventType,
createdAt: at,
sourceKind: "service",
sourceId: "code-queue",
aggregateType: "task",
aggregateId: task.id,
correlationId: task.id,
tags: taskTags(task, queueId, ["judge", "diagnostics"], attempt > 0 ? attempt : null),
payload: {
taskId: task.id,
queueId,
attempt,
...payload,
createdAt: at,
},
});
}
export async function readOaTraceStatsForScopeIds(scopeIds: string[]): Promise<Map<string, OaTraceStats>> {
const result = new Map<string, OaTraceStats>();
const uniqueScopeIds = Array.from(new Set(scopeIds.map((id) => String(id || "").trim()).filter(Boolean)));