fix: keep reusable session runner alive

This commit is contained in:
lyon
2026-06-22 11:31:05 +08:00
parent a73a5cd83f
commit 88a011cf6a
5 changed files with 103 additions and 13 deletions
+17
View File
@@ -477,6 +477,23 @@ CREATE TABLE IF NOT EXISTS agentrun_schema_migrations (
return result.rows.map(eventFromRow); return result.rows.map(eventFromRow);
} }
async listEventsForCommand(runId: string, commandId: string, limit: number): Promise<RunEvent[]> {
await this.getRun(runId);
const result = await this.pool.query(
`SELECT * FROM agentrun_events
WHERE run_id = $1
AND (
payload->>'commandId' = $2
OR payload->>'targetCommandId' = $2
OR (type = 'terminal_status' AND NOT (payload ? 'commandId'))
)
ORDER BY seq ASC
LIMIT $3`,
[runId, commandId, clamp(limit, 1, 2_000)],
);
return result.rows.map(eventFromRow);
}
async createCommand(runId: string, input: CreateCommandInput): Promise<CommandRecord> { async createCommand(runId: string, input: CreateCommandInput): Promise<CommandRecord> {
const payloadHash = stableHash(input.payload); const payloadHash = stableHash(input.payload);
return this.withTransaction(async (client) => { return this.withTransaction(async (client) => {
+8 -2
View File
@@ -79,7 +79,7 @@ export async function reconcileRunnerJobsOnce(input: RunnerReconcilerOptions): P
if (terminalOutbox) observation = mergeTerminalOutboxObservation(observation, terminalOutbox); if (terminalOutbox) observation = mergeTerminalOutboxObservation(observation, terminalOutbox);
await input.store.updateRunnerJobResult(job.id, { observation }); await input.store.updateRunnerJobResult(job.id, { observation });
if (stringValue(observation.category) === "runner-job-observe-failed") observeFailedCount++; if (stringValue(observation.category) === "runner-job-observe-failed") observeFailedCount++;
const runClosure = await closeOpenRunWhenCommandTerminal(input.store, job); const runClosure = await closeOpenRunWhenCommandTerminal(input.store, job, observation);
if (runClosure.closed === true) runClosureCount++; if (runClosure.closed === true) runClosureCount++;
items.push({ items.push({
runnerJobId: job.id, runnerJobId: job.id,
@@ -295,7 +295,7 @@ function observationFromObjects(job: RunnerJobRecord, namespace: string, observe
}; };
} }
async function closeOpenRunWhenCommandTerminal(store: AgentRunStore, job: RunnerJobRecord): Promise<JsonRecord> { async function closeOpenRunWhenCommandTerminal(store: AgentRunStore, job: RunnerJobRecord, observation: JsonRecord): Promise<JsonRecord> {
let command: CommandRecord; let command: CommandRecord;
try { try {
command = await store.getCommand(job.commandId); command = await store.getCommand(job.commandId);
@@ -303,6 +303,8 @@ async function closeOpenRunWhenCommandTerminal(store: AgentRunStore, job: Runner
return { closed: false, state: "command-missing", valuesPrinted: false }; return { closed: false, state: "command-missing", valuesPrinted: false };
} }
if (!isTerminalCommandState(command.state)) return { closed: false, state: "command-open", commandState: command.state, valuesPrinted: false }; if (!isTerminalCommandState(command.state)) return { closed: false, state: "command-open", commandState: command.state, valuesPrinted: false };
const observedRunnerPhase = stringValue(observation.observedRunnerPhase) ?? stringValue(observation.phase);
if (!runnerJobAllowsRunClosure(observedRunnerPhase)) return { closed: false, state: "runner-job-still-active", observedRunnerPhase: observedRunnerPhase ?? "unknown", commandState: command.state, valuesPrinted: false };
const run = await store.getRun(job.runId); const run = await store.getRun(job.runId);
if (isTerminalRunStatus(run.status)) return { closed: false, state: "run-terminal", runStatus: run.status, commandState: command.state, valuesPrinted: false }; if (isTerminalRunStatus(run.status)) return { closed: false, state: "run-terminal", runStatus: run.status, commandState: command.state, valuesPrinted: false };
const terminalStatus = terminalStatusFromCommand(command); const terminalStatus = terminalStatusFromCommand(command);
@@ -312,6 +314,10 @@ async function closeOpenRunWhenCommandTerminal(store: AgentRunStore, job: Runner
return { closed: true, state: "closed-open-run", runStatus: next.status, terminalStatus, commandState: command.state, valuesPrinted: false }; return { closed: true, state: "closed-open-run", runStatus: next.status, terminalStatus, commandState: command.state, valuesPrinted: false };
} }
function runnerJobAllowsRunClosure(phase: string | null): boolean {
return phase === "k8s:succeeded" || phase === "k8s:failed" || phase === "k8s:missing";
}
function terminalStatusFromCommand(command: CommandRecord): TerminalStatus { function terminalStatusFromCommand(command: CommandRecord): TerminalStatus {
if (command.state === "completed") return "completed"; if (command.state === "completed") return "completed";
if (command.state === "cancelled") return "cancelled"; if (command.state === "cancelled") return "cancelled";
+51 -8
View File
@@ -436,7 +436,7 @@ function compactRecoveryActions(value: JsonValue | undefined): JsonValue[] {
} }
function commandResultOtelAttributes(result: JsonValue): JsonRecord { function commandResultOtelAttributes(result: JsonValue): JsonRecord {
const record = asJsonRecord(result); const record = asJsonRecord(result) ?? {};
const terminalClassification = asJsonRecord(record.terminalClassification); const terminalClassification = asJsonRecord(record.terminalClassification);
const diagnosis = asJsonRecord(record.diagnosis); const diagnosis = asJsonRecord(record.diagnosis);
const runnerJob = asJsonRecord(diagnosis?.runnerJob); const runnerJob = asJsonRecord(diagnosis?.runnerJob);
@@ -771,8 +771,8 @@ async function route({ method, url, body, store, sourceCommit, authSummary, runn
const runId = runnerJobMatch[1] ?? ""; const runId = runnerJobMatch[1] ?? "";
const commandId = url.searchParams.get("commandId") ?? undefined; const commandId = url.searchParams.get("commandId") ?? undefined;
const jobs = await store.listRunnerJobs(runId, commandId); const jobs = await store.listRunnerJobs(runId, commandId);
const events = await store.listEvents(runId, 0, 500); const items = await Promise.all(jobs.map(async (job) => runnerJobStatusSummary(job, await store.listEventsForCommand(runId, job.commandId, 2_000))));
return { items: jobs.map((job) => runnerJobStatusSummary(job, events)), count: jobs.length, lastSeq: events.at(-1)?.seq ?? 0 }; return { items, count: jobs.length };
} }
const runnerJobShowMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/runner-jobs\/([^/]+)$/u); const runnerJobShowMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/runner-jobs\/([^/]+)$/u);
if (method === "GET" && runnerJobShowMatch) { if (method === "GET" && runnerJobShowMatch) {
@@ -781,7 +781,7 @@ async function route({ method, url, body, store, sourceCommit, authSummary, runn
const jobs = await store.listRunnerJobs(runId); const jobs = await store.listRunnerJobs(runId);
const job = jobs.find((item) => item.id === runnerJobId); const job = jobs.find((item) => item.id === runnerJobId);
if (!job) throw new AgentRunError("schema-invalid", `runner job ${runnerJobId} was not found`, { httpStatus: 404 }); if (!job) throw new AgentRunError("schema-invalid", `runner job ${runnerJobId} was not found`, { httpStatus: 404 });
return runnerJobStatusSummary(job, await store.listEvents(runId, 0, 500)) as JsonValue; return runnerJobStatusSummary(job, await store.listEventsForCommand(runId, job.commandId, 2_000)) as JsonValue;
} }
if (method === "POST" && path === "/api/v1/reconciler/runner-jobs") { if (method === "POST" && path === "/api/v1/reconciler/runner-jobs") {
const record = body === null ? {} : asRecord(body, "runnerReconciler"); const record = body === null ? {} : asRecord(body, "runnerReconciler");
@@ -892,6 +892,19 @@ async function sendToSession(input: { store: AgentRunStore; sessionId: string; b
return sessionSendResponse({ sessionId: input.sessionId, decision: "steer", run: active.run, command, runnerJob: null, activeBefore: active, dryRun: false }); return sessionSendResponse({ sessionId: input.sessionId, decision: "steer", run: active.run, command, runnerJob: null, activeBefore: active, dryRun: false });
} }
const idleRun = existing ? await idleReceivableRun(input.store, existing) : null;
if (idleRun) {
const commandBody: JsonRecord = { type: "turn", payload };
if (commandIdempotencyKey) commandBody.idempotencyKey = commandIdempotencyKey;
const request = { method: "POST", path: `/api/v1/runs/${idleRun.run.id}/commands`, commandType: "turn", payloadBytes: jsonByteLength(payload), reusedIdleRun: true, valuesPrinted: false };
if (dryRun) return sessionSendPlan(input.sessionId, "turn", active, request, null, idleRunSummary(idleRun));
const command = await input.store.createCommand(idleRun.run.id, validateCreateCommand(commandBody));
const run = await input.store.getRun(idleRun.run.id);
void emitAgentRunOtelSpan("command_created", run, process.env, { command, startTimeMs: startedAt, kind: 2, attributes: { "http.method": "POST", "http.route": "/api/v1/sessions/:sessionId/send", "http.status_code": 200, decision: "turn", reusedIdleRun: true, commandType: command.type, commandState: command.state } });
void emitAgentRunOtelSpan("session_send", run, process.env, { command, startTimeMs: startedAt, kind: 2, attributes: { "http.method": "POST", "http.route": "/api/v1/sessions/:sessionId/send", "http.status_code": 200, decision: "turn", createRunnerJob: false, reusedIdleRun: true } });
return sessionSendResponse({ sessionId: input.sessionId, decision: "turn", run, command, runnerJob: null, activeBefore: active, reusedIdleRun: idleRunSummary(idleRun), dryRun: false });
}
const runRecord = asRecord(record.run ?? record.runBase ?? null, "sessionSend.run"); const runRecord = asRecord(record.run ?? record.runBase ?? null, "sessionSend.run");
const runBody = sessionSendRunBody(input.sessionId, runRecord); const runBody = sessionSendRunBody(input.sessionId, runRecord);
const commandBody: JsonRecord = { type: "turn", payload }; const commandBody: JsonRecord = { type: "turn", payload };
@@ -908,7 +921,7 @@ async function sendToSession(input: { store: AgentRunStore; sessionId: string; b
runnerJobBytes: createRunnerJob ? jsonByteLength(runnerJobBody) : 0, runnerJobBytes: createRunnerJob ? jsonByteLength(runnerJobBody) : 0,
valuesPrinted: false, valuesPrinted: false,
}; };
if (dryRun) return sessionSendPlan(input.sessionId, "turn", active, request, runBody); if (dryRun) return sessionSendPlan(input.sessionId, "turn", active, request, runBody, null);
const run = await input.store.createRun(validateCreateRun(runBody)); const run = await input.store.createRun(validateCreateRun(runBody));
void emitAgentRunOtelSpan("run_created", run, process.env, { startTimeMs: startedAt, kind: 2, attributes: { "http.method": "POST", "http.route": "/api/v1/sessions/:sessionId/send", "http.status_code": 200, decision: "turn", backendProfile: run.backendProfile, providerId: run.providerId } }); void emitAgentRunOtelSpan("run_created", run, process.env, { startTimeMs: startedAt, kind: 2, attributes: { "http.method": "POST", "http.route": "/api/v1/sessions/:sessionId/send", "http.status_code": 200, decision: "turn", backendProfile: run.backendProfile, providerId: run.providerId } });
const command = await input.store.createCommand(run.id, validateCreateCommand(commandBody)); const command = await input.store.createCommand(run.id, validateCreateCommand(commandBody));
@@ -925,7 +938,7 @@ async function sendToSession(input: { store: AgentRunStore; sessionId: string; b
void emitAgentRunOtelSpan("runner_job_created", run, process.env, { command, startTimeMs: startedAt, kind: 2, attributes: { "http.method": "POST", "http.route": "/api/v1/sessions/:sessionId/send", "http.status_code": 200, decision: "turn", jobName: stringJsonValue(runnerJobRecord?.jobName), namespace: stringJsonValue(runnerJobRecord?.namespace) } }); void emitAgentRunOtelSpan("runner_job_created", run, process.env, { command, startTimeMs: startedAt, kind: 2, attributes: { "http.method": "POST", "http.route": "/api/v1/sessions/:sessionId/send", "http.status_code": 200, decision: "turn", jobName: stringJsonValue(runnerJobRecord?.jobName), namespace: stringJsonValue(runnerJobRecord?.namespace) } });
} }
void emitAgentRunOtelSpan("session_send", run, process.env, { command, startTimeMs: startedAt, kind: 2, attributes: { "http.method": "POST", "http.route": "/api/v1/sessions/:sessionId/send", "http.status_code": 200, decision: "turn", createRunnerJob } }); void emitAgentRunOtelSpan("session_send", run, process.env, { command, startTimeMs: startedAt, kind: 2, attributes: { "http.method": "POST", "http.route": "/api/v1/sessions/:sessionId/send", "http.status_code": 200, decision: "turn", createRunnerJob } });
return sessionSendResponse({ sessionId: input.sessionId, decision: "turn", run, command, runnerJob, activeBefore: active, dryRun: false }); return sessionSendResponse({ sessionId: input.sessionId, decision: "turn", run, command, runnerJob, activeBefore: active, reusedIdleRun: null, dryRun: false });
} }
async function activeReceivableCommand(store: AgentRunStore, session: SessionRecord): Promise<{ run: RunRecord; command: CommandRecord; reason: string; leaseExpired: boolean } | null> { async function activeReceivableCommand(store: AgentRunStore, session: SessionRecord): Promise<{ run: RunRecord; command: CommandRecord; reason: string; leaseExpired: boolean } | null> {
@@ -941,6 +954,21 @@ async function activeReceivableCommand(store: AgentRunStore, session: SessionRec
return { run, command, reason: "active-turn-running", leaseExpired }; return { run, command, reason: "active-turn-running", leaseExpired };
} }
async function idleReceivableRun(store: AgentRunStore, session: SessionRecord): Promise<{ run: RunRecord; lastCommand: CommandRecord; reason: string; leaseExpired: boolean } | null> {
if (!session.lastRunId || !session.lastCommandId) return null;
try {
const [run, lastCommand] = await Promise.all([store.getRun(session.lastRunId), store.getCommand(session.lastCommandId)]);
if (run.sessionRef?.sessionId !== session.sessionId || lastCommand.runId !== run.id) return null;
if (runIsTerminal(run) || !commandIsTerminal(lastCommand)) return null;
const leaseExpired = run.leaseExpiresAt ? Date.parse(run.leaseExpiresAt) <= Date.now() : false;
if (leaseExpired) return null;
if (run.status !== "claimed" && run.status !== "running") return null;
return { run, lastCommand, reason: "idle-run-reusable", leaseExpired };
} catch {
return null;
}
}
function sessionSendPayload(record: JsonRecord): JsonRecord { function sessionSendPayload(record: JsonRecord): JsonRecord {
const payload = asJsonRecord(record.payload); const payload = asJsonRecord(record.payload);
if (payload) return payload; if (payload) return payload;
@@ -955,7 +983,7 @@ function sessionSendRunBody(sessionId: string, runRecord: JsonRecord): JsonRecor
return { ...runRecord, sessionRef: { ...sessionRef, sessionId, metadata } }; return { ...runRecord, sessionRef: { ...sessionRef, sessionId, metadata } };
} }
function sessionSendPlan(sessionId: string, decision: "steer" | "turn", active: Awaited<ReturnType<typeof activeReceivableCommand>>, request: JsonRecord, runBody: JsonRecord | null): JsonRecord { function sessionSendPlan(sessionId: string, decision: "steer" | "turn", active: Awaited<ReturnType<typeof activeReceivableCommand>>, request: JsonRecord, runBody: JsonRecord | null, reusedIdleRun: JsonRecord | null = null): JsonRecord {
return { return {
action: "session-send-plan", action: "session-send-plan",
dryRun: true, dryRun: true,
@@ -964,6 +992,7 @@ function sessionSendPlan(sessionId: string, decision: "steer" | "turn", active:
decision, decision,
internalCommandType: decision, internalCommandType: decision,
activeBefore: active ? activeBeforeSummary(active) : null, activeBefore: active ? activeBeforeSummary(active) : null,
reusedIdleRun,
request, request,
...(runBody ? { run: { bodyBytes: jsonByteLength(runBody), sessionRef: summarizeSendSessionRef(runBody), valuesPrinted: false } } : {}), ...(runBody ? { run: { bodyBytes: jsonByteLength(runBody), sessionRef: summarizeSendSessionRef(runBody), valuesPrinted: false } } : {}),
next: { confirm: managerActionDescriptor({ action: "send-session", operation: "send", resourceKind: "session", resourceName: sessionId, sessionId, inputKind: "prompt" }), note: "Remove --dry-run to perform the mutation. Manager will decide internal steer vs turn from durable session state." }, next: { confirm: managerActionDescriptor({ action: "send-session", operation: "send", resourceKind: "session", resourceName: sessionId, sessionId, inputKind: "prompt" }), note: "Remove --dry-run to perform the mutation. Manager will decide internal steer vs turn from durable session state." },
@@ -971,7 +1000,7 @@ function sessionSendPlan(sessionId: string, decision: "steer" | "turn", active:
}; };
} }
function sessionSendResponse(input: { sessionId: string; decision: "steer" | "turn"; run: RunRecord; command: CommandRecord; runnerJob: JsonValue; activeBefore: Awaited<ReturnType<typeof activeReceivableCommand>>; dryRun: false }): JsonRecord { function sessionSendResponse(input: { sessionId: string; decision: "steer" | "turn"; run: RunRecord; command: CommandRecord; runnerJob: JsonValue; activeBefore: Awaited<ReturnType<typeof activeReceivableCommand>>; reusedIdleRun?: JsonRecord | null; dryRun: false }): JsonRecord {
return { return {
action: "session-send", action: "session-send",
dryRun: input.dryRun, dryRun: input.dryRun,
@@ -983,6 +1012,7 @@ function sessionSendResponse(input: { sessionId: string; decision: "steer" | "tu
command: input.command as unknown as JsonRecord, command: input.command as unknown as JsonRecord,
runnerJob: input.runnerJob, runnerJob: input.runnerJob,
activeBefore: input.activeBefore ? activeBeforeSummary(input.activeBefore) : null, activeBefore: input.activeBefore ? activeBeforeSummary(input.activeBefore) : null,
reusedIdleRun: input.reusedIdleRun ?? null,
pollActions: [ pollActions: [
managerActionDescriptor({ action: "inspect-session", operation: "describe", resourceKind: "session", resourceName: input.sessionId, sessionId: input.sessionId, readerId: "cli" }), managerActionDescriptor({ action: "inspect-session", operation: "describe", resourceKind: "session", resourceName: input.sessionId, sessionId: input.sessionId, readerId: "cli" }),
managerActionDescriptor({ action: "poll-trace", operation: "events", resourceKind: "run", resourceName: input.run.id, runId: input.run.id, commandId: input.command.id, sessionId: input.sessionId, afterSeq: 0, limit: 100 }), managerActionDescriptor({ action: "poll-trace", operation: "events", resourceKind: "run", resourceName: input.run.id, runId: input.run.id, commandId: input.command.id, sessionId: input.sessionId, afterSeq: 0, limit: 100 }),
@@ -1016,6 +1046,19 @@ function activeBeforeSummary(active: NonNullable<Awaited<ReturnType<typeof activ
return { runId: active.run.id, commandId: active.command.id, commandState: active.command.state, runStatus: active.run.status, leaseExpiresAt: active.run.leaseExpiresAt, leaseExpired: active.leaseExpired, reason: active.reason, valuesPrinted: false }; return { runId: active.run.id, commandId: active.command.id, commandState: active.command.state, runStatus: active.run.status, leaseExpiresAt: active.run.leaseExpiresAt, leaseExpired: active.leaseExpired, reason: active.reason, valuesPrinted: false };
} }
function idleRunSummary(idleRun: NonNullable<Awaited<ReturnType<typeof idleReceivableRun>>>): JsonRecord {
return {
runId: idleRun.run.id,
commandId: idleRun.lastCommand.id,
runStatus: idleRun.run.status,
commandState: idleRun.lastCommand.state,
leaseExpiresAt: idleRun.run.leaseExpiresAt,
leaseExpired: idleRun.leaseExpired,
reason: idleRun.reason,
valuesPrinted: false,
};
}
function summarizeSendSessionRef(runBody: JsonRecord): JsonRecord { function summarizeSendSessionRef(runBody: JsonRecord): JsonRecord {
const ref = asJsonRecord(runBody.sessionRef) ?? {}; const ref = asJsonRecord(runBody.sessionRef) ?? {};
return { sessionId: optionalString(ref.sessionId), conversationId: optionalString(ref.conversationId), threadId: optionalString(ref.threadId), metadataKeys: Object.keys(asJsonRecord(ref.metadata) ?? {}).sort(), valuesPrinted: false }; return { sessionId: optionalString(ref.sessionId), conversationId: optionalString(ref.conversationId), threadId: optionalString(ref.threadId), metadataKeys: Object.keys(asJsonRecord(ref.metadata) ?? {}).sort(), valuesPrinted: false };
+16 -1
View File
@@ -23,6 +23,7 @@ export interface AgentRunStore {
createRun(input: CreateRunInput): MaybePromise<RunRecord>; createRun(input: CreateRunInput): MaybePromise<RunRecord>;
getRun(runId: string): MaybePromise<RunRecord>; getRun(runId: string): MaybePromise<RunRecord>;
listEvents(runId: string, afterSeq: number, limit: number): MaybePromise<RunEvent[]>; listEvents(runId: string, afterSeq: number, limit: number): MaybePromise<RunEvent[]>;
listEventsForCommand(runId: string, commandId: string, limit: number): MaybePromise<RunEvent[]>;
createCommand(runId: string, input: CreateCommandInput): MaybePromise<CommandRecord>; createCommand(runId: string, input: CreateCommandInput): MaybePromise<CommandRecord>;
getCommand(commandId: string): MaybePromise<CommandRecord>; getCommand(commandId: string): MaybePromise<CommandRecord>;
listCommands(runId: string, afterSeq: number, limit: number): MaybePromise<CommandRecord[]>; listCommands(runId: string, afterSeq: number, limit: number): MaybePromise<CommandRecord[]>;
@@ -111,7 +112,8 @@ export async function openAgentRunStoreFromEnv(env: NodeJS.ProcessEnv = process.
const databaseUrl = env.DATABASE_URL?.trim(); const databaseUrl = env.DATABASE_URL?.trim();
if (databaseUrl) { if (databaseUrl) {
const { createPostgresAgentRunStore } = await import("./postgres-store.js"); const { createPostgresAgentRunStore } = await import("./postgres-store.js");
return createPostgresAgentRunStore({ connectionString: databaseUrl, poolMax: optionalPositiveIntegerEnv(env, "AGENTRUN_POSTGRES_POOL_MAX") }); const poolMax = optionalPositiveIntegerEnv(env, "AGENTRUN_POSTGRES_POOL_MAX");
return createPostgresAgentRunStore({ connectionString: databaseUrl, ...(poolMax !== undefined ? { poolMax } : {}) });
} }
const storeMode = env.AGENTRUN_STORE ?? env.AGENTRUN_MGR_STORE; const storeMode = env.AGENTRUN_STORE ?? env.AGENTRUN_MGR_STORE;
if (storeMode === "memory") return new MemoryAgentRunStore(); if (storeMode === "memory") return new MemoryAgentRunStore();
@@ -167,6 +169,14 @@ export class MemoryAgentRunStore implements AgentRunStore {
return (this.eventsByRun.get(runId) ?? []).filter((event) => event.seq > afterSeq).slice(0, Math.max(1, Math.min(limit, 500))); return (this.eventsByRun.get(runId) ?? []).filter((event) => event.seq > afterSeq).slice(0, Math.max(1, Math.min(limit, 500)));
} }
listEventsForCommand(runId: string, commandId: string, limit: number): RunEvent[] {
this.getRun(runId);
const clamped = Math.max(1, Math.min(limit, 2_000));
return (this.eventsByRun.get(runId) ?? [])
.filter((event) => eventMatchesCommand(event, commandId))
.slice(0, clamped);
}
createCommand(runId: string, input: CreateCommandInput): CommandRecord { createCommand(runId: string, input: CreateCommandInput): CommandRecord {
const run = this.getRun(runId); const run = this.getRun(runId);
if (isTerminalRunStatus(run.status)) throw new AgentRunError(run.failureKind ?? (run.status === "cancelled" ? "cancelled" : "schema-invalid"), `run ${runId} is already terminal: ${run.status}`, { httpStatus: 409 }); if (isTerminalRunStatus(run.status)) throw new AgentRunError(run.failureKind ?? (run.status === "cancelled" ? "cancelled" : "schema-invalid"), `run ${runId} is already terminal: ${run.status}`, { httpStatus: 409 });
@@ -867,6 +877,11 @@ export function sessionTitleFromCommand(command: CommandRecord): string | null {
return value.trim().replace(/\s+/gu, " ").slice(0, 120) || null; return value.trim().replace(/\s+/gu, " ").slice(0, 120) || null;
} }
function eventMatchesCommand(event: RunEvent, commandId: string): boolean {
const payload = event.payload;
return payload.commandId === commandId || payload.targetCommandId === commandId || (event.type === "terminal_status" && payload.commandId === undefined);
}
export function isSessionOutputEvent(event: RunEvent): boolean { export function isSessionOutputEvent(event: RunEvent): boolean {
return event.type === "assistant_message" || event.type === "command_output" || event.type === "diff" || event.type === "error" || event.type === "terminal_status"; return event.type === "assistant_message" || event.type === "command_output" || event.type === "diff" || event.type === "error" || event.type === "terminal_status";
} }
+11 -2
View File
@@ -131,15 +131,17 @@ export async function runOnce(options: RunnerOnceOptions): Promise<JsonRecord> {
try { try {
let idleSince = Date.now(); let idleSince = Date.now();
let firstPoll = true; let firstPoll = true;
let commandPollAfterSeq = 0;
while (true) { while (true) {
const currentRun = await api.getRun(options.runId); const currentRun = await api.getRun(options.runId);
if (isTerminalRun(currentRun)) return { runner, claimed, terminalStatus: currentRun.terminalStatus, failureKind: currentRun.failureKind, run: currentRun, commandsProcessed: commandResults.length, commandResults, stopped: "run-terminal" } as JsonRecord; if (isTerminalRun(currentRun)) return { runner, claimed, terminalStatus: currentRun.terminalStatus, failureKind: currentRun.failureKind, run: currentRun, commandsProcessed: commandResults.length, commandResults, stopped: "run-terminal" } as JsonRecord;
const commandsResponse = await api.pollCommands(options.runId, { afterSeq: 0, limit: 20, ...(firstPoll && options.commandId ? { commandId: options.commandId } : {}) }); const commandsResponse = await api.pollCommands(options.runId, { afterSeq: commandPollAfterSeq, limit: 50, ...(firstPoll && options.commandId ? { commandId: options.commandId } : {}) });
firstPoll = false; firstPoll = false;
const command = commandsResponse.selected; const command = commandsResponse.selected;
if (!command) { if (!command) {
await failPendingSteerWithoutActiveTurn(api, options.runId, commandsResponse.items, runner.id, attemptId, runnerLog, options.runnerJobId); await failPendingSteerWithoutActiveTurn(api, options.runId, commandsResponse.items, runner.id, attemptId, runnerLog, options.runnerJobId);
commandPollAfterSeq = Math.max(commandPollAfterSeq, lastCommandSeq(commandsResponse.items));
if (options.oneShot === true) return noPendingResult(runner, claimed, commandResults, commandsResponse.items.length, "one-shot-no-pending"); if (options.oneShot === true) return noPendingResult(runner, claimed, commandResults, commandsResponse.items.length, "one-shot-no-pending");
if (Date.now() - idleSince >= idleTimeoutMs) return noPendingResult(runner, claimed, commandResults, commandsResponse.items.length, "idle-timeout"); if (Date.now() - idleSince >= idleTimeoutMs) return noPendingResult(runner, claimed, commandResults, commandsResponse.items.length, "idle-timeout");
await sleep(pollIntervalMs); await sleep(pollIntervalMs);
@@ -171,6 +173,7 @@ export async function runOnce(options: RunnerOnceOptions): Promise<JsonRecord> {
? await reportCommandFailure(api, options.runId, command.id, runner, attemptId, materializationFailure, "runner:resource-bundle", runnerLog, { terminalRun: true, ...(options.runnerJobId ? { runnerJobId: options.runnerJobId } : {}) }) ? await reportCommandFailure(api, options.runId, command.id, runner, attemptId, materializationFailure, "runner:resource-bundle", runnerLog, { terminalRun: true, ...(options.runnerJobId ? { runnerJobId: options.runnerJobId } : {}) })
: await executeCommand(api, withResourceAssembly(options, resourceEnv, initialPrompt), command, runner, attemptId, workspacePath, backendSession ?? (backendSession = createBackendSession(currentRun, withResourceAssembly(options, resourceEnv, initialPrompt))), runnerLog); : await executeCommand(api, withResourceAssembly(options, resourceEnv, initialPrompt), command, runner, attemptId, workspacePath, backendSession ?? (backendSession = createBackendSession(currentRun, withResourceAssembly(options, resourceEnv, initialPrompt))), runnerLog);
commandResults.push(result); commandResults.push(result);
commandPollAfterSeq = Math.max(commandPollAfterSeq, command.seq);
idleSince = Date.now(); idleSince = Date.now();
if (options.oneShot === true) { if (options.oneShot === true) {
const run = await api.reportStatus(options.runId, { terminalStatus: result.terminalStatus, failureKind: result.failureKind, failureMessage: null }); const run = await api.reportStatus(options.runId, { terminalStatus: result.terminalStatus, failureKind: result.failureKind, failureMessage: null });
@@ -317,7 +320,7 @@ async function executeCommand(api: RunnerManagerApi, options: RunnerOnceOptions,
}; };
await appendBestEffort(api, options.runId, { type: "error", payload: retryPayload }); await appendBestEffort(api, options.runId, { type: "error", payload: retryPayload });
await appendBestEffort(api, options.runId, { type: "backend_status", payload: retryPayload }); await appendBestEffort(api, options.runId, { type: "backend_status", payload: retryPayload });
await runnerLog.write("command.retrying", { runId: options.runId, commandId: command.id, runnerId: runner.id, runnerJobId: options.runnerJobId ?? null, ...retryPayload, valuesPrinted: false }); await runnerLog.write("command.retrying", { runId: options.runId, ...retryPayload, valuesPrinted: false });
if (backendSession) { if (backendSession) {
const closeEvents = await backendSession.close(); const closeEvents = await backendSession.close();
for (const event of closeEvents) await appendBestEffort(api, options.runId, annotateCommandEvent(event, command.id, attemptId, runner.id)); for (const event of closeEvents) await appendBestEffort(api, options.runId, annotateCommandEvent(event, command.id, attemptId, runner.id));
@@ -720,6 +723,12 @@ function noPendingResult(runner: RunnerRecord, claimed: RunRecord, commandResult
return { runner, claimed, terminalStatus: "blocked", failureKind: "schema-invalid", commandsProcessed: 0, commandResults, polledCommands, stopped }; return { runner, claimed, terminalStatus: "blocked", failureKind: "schema-invalid", commandsProcessed: 0, commandResults, polledCommands, stopped };
} }
function lastCommandSeq(commands: CommandRecord[]): number {
let seq = 0;
for (const command of commands) seq = Math.max(seq, command.seq);
return seq;
}
function sleep(ms: number): Promise<void> { function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms)); return new Promise((resolve) => setTimeout(resolve, ms));
} }