From 88a011cf6a1ba5c2b58a07145149c2267912d95a Mon Sep 17 00:00:00 2001 From: lyon Date: Mon, 22 Jun 2026 11:31:05 +0800 Subject: [PATCH] fix: keep reusable session runner alive --- src/mgr/postgres-store.ts | 17 +++++++++++ src/mgr/runner-reconciler.ts | 10 ++++-- src/mgr/server.ts | 59 +++++++++++++++++++++++++++++++----- src/mgr/store.ts | 17 ++++++++++- src/runner/run-once.ts | 13 ++++++-- 5 files changed, 103 insertions(+), 13 deletions(-) diff --git a/src/mgr/postgres-store.ts b/src/mgr/postgres-store.ts index aa76728..7f93466 100644 --- a/src/mgr/postgres-store.ts +++ b/src/mgr/postgres-store.ts @@ -477,6 +477,23 @@ CREATE TABLE IF NOT EXISTS agentrun_schema_migrations ( return result.rows.map(eventFromRow); } + async listEventsForCommand(runId: string, commandId: string, limit: number): Promise { + await this.getRun(runId); + const result = await this.pool.query( + `SELECT * FROM agentrun_events + WHERE run_id = $1 + AND ( + payload->>'commandId' = $2 + OR payload->>'targetCommandId' = $2 + OR (type = 'terminal_status' AND NOT (payload ? 'commandId')) + ) + ORDER BY seq ASC + LIMIT $3`, + [runId, commandId, clamp(limit, 1, 2_000)], + ); + return result.rows.map(eventFromRow); + } + async createCommand(runId: string, input: CreateCommandInput): Promise { const payloadHash = stableHash(input.payload); return this.withTransaction(async (client) => { diff --git a/src/mgr/runner-reconciler.ts b/src/mgr/runner-reconciler.ts index 22c66ce..7553b70 100644 --- a/src/mgr/runner-reconciler.ts +++ b/src/mgr/runner-reconciler.ts @@ -79,7 +79,7 @@ export async function reconcileRunnerJobsOnce(input: RunnerReconcilerOptions): P if (terminalOutbox) observation = mergeTerminalOutboxObservation(observation, terminalOutbox); await input.store.updateRunnerJobResult(job.id, { observation }); if (stringValue(observation.category) === "runner-job-observe-failed") observeFailedCount++; - const runClosure = await closeOpenRunWhenCommandTerminal(input.store, job); + const runClosure = await closeOpenRunWhenCommandTerminal(input.store, job, observation); if (runClosure.closed === true) runClosureCount++; items.push({ runnerJobId: job.id, @@ -295,7 +295,7 @@ function observationFromObjects(job: RunnerJobRecord, namespace: string, observe }; } -async function closeOpenRunWhenCommandTerminal(store: AgentRunStore, job: RunnerJobRecord): Promise { +async function closeOpenRunWhenCommandTerminal(store: AgentRunStore, job: RunnerJobRecord, observation: JsonRecord): Promise { let command: CommandRecord; try { command = await store.getCommand(job.commandId); @@ -303,6 +303,8 @@ async function closeOpenRunWhenCommandTerminal(store: AgentRunStore, job: Runner return { closed: false, state: "command-missing", valuesPrinted: false }; } if (!isTerminalCommandState(command.state)) return { closed: false, state: "command-open", commandState: command.state, valuesPrinted: false }; + const observedRunnerPhase = stringValue(observation.observedRunnerPhase) ?? stringValue(observation.phase); + if (!runnerJobAllowsRunClosure(observedRunnerPhase)) return { closed: false, state: "runner-job-still-active", observedRunnerPhase: observedRunnerPhase ?? "unknown", commandState: command.state, valuesPrinted: false }; const run = await store.getRun(job.runId); if (isTerminalRunStatus(run.status)) return { closed: false, state: "run-terminal", runStatus: run.status, commandState: command.state, valuesPrinted: false }; const terminalStatus = terminalStatusFromCommand(command); @@ -312,6 +314,10 @@ async function closeOpenRunWhenCommandTerminal(store: AgentRunStore, job: Runner return { closed: true, state: "closed-open-run", runStatus: next.status, terminalStatus, commandState: command.state, valuesPrinted: false }; } +function runnerJobAllowsRunClosure(phase: string | null): boolean { + return phase === "k8s:succeeded" || phase === "k8s:failed" || phase === "k8s:missing"; +} + function terminalStatusFromCommand(command: CommandRecord): TerminalStatus { if (command.state === "completed") return "completed"; if (command.state === "cancelled") return "cancelled"; diff --git a/src/mgr/server.ts b/src/mgr/server.ts index c356642..275cfe5 100644 --- a/src/mgr/server.ts +++ b/src/mgr/server.ts @@ -436,7 +436,7 @@ function compactRecoveryActions(value: JsonValue | undefined): JsonValue[] { } function commandResultOtelAttributes(result: JsonValue): JsonRecord { - const record = asJsonRecord(result); + const record = asJsonRecord(result) ?? {}; const terminalClassification = asJsonRecord(record.terminalClassification); const diagnosis = asJsonRecord(record.diagnosis); const runnerJob = asJsonRecord(diagnosis?.runnerJob); @@ -771,8 +771,8 @@ async function route({ method, url, body, store, sourceCommit, authSummary, runn const runId = runnerJobMatch[1] ?? ""; const commandId = url.searchParams.get("commandId") ?? undefined; const jobs = await store.listRunnerJobs(runId, commandId); - const events = await store.listEvents(runId, 0, 500); - return { items: jobs.map((job) => runnerJobStatusSummary(job, events)), count: jobs.length, lastSeq: events.at(-1)?.seq ?? 0 }; + const items = await Promise.all(jobs.map(async (job) => runnerJobStatusSummary(job, await store.listEventsForCommand(runId, job.commandId, 2_000)))); + return { items, count: jobs.length }; } const runnerJobShowMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/runner-jobs\/([^/]+)$/u); if (method === "GET" && runnerJobShowMatch) { @@ -781,7 +781,7 @@ async function route({ method, url, body, store, sourceCommit, authSummary, runn const jobs = await store.listRunnerJobs(runId); const job = jobs.find((item) => item.id === runnerJobId); if (!job) throw new AgentRunError("schema-invalid", `runner job ${runnerJobId} was not found`, { httpStatus: 404 }); - return runnerJobStatusSummary(job, await store.listEvents(runId, 0, 500)) as JsonValue; + return runnerJobStatusSummary(job, await store.listEventsForCommand(runId, job.commandId, 2_000)) as JsonValue; } if (method === "POST" && path === "/api/v1/reconciler/runner-jobs") { const record = body === null ? {} : asRecord(body, "runnerReconciler"); @@ -892,6 +892,19 @@ async function sendToSession(input: { store: AgentRunStore; sessionId: string; b return sessionSendResponse({ sessionId: input.sessionId, decision: "steer", run: active.run, command, runnerJob: null, activeBefore: active, dryRun: false }); } + const idleRun = existing ? await idleReceivableRun(input.store, existing) : null; + if (idleRun) { + const commandBody: JsonRecord = { type: "turn", payload }; + if (commandIdempotencyKey) commandBody.idempotencyKey = commandIdempotencyKey; + const request = { method: "POST", path: `/api/v1/runs/${idleRun.run.id}/commands`, commandType: "turn", payloadBytes: jsonByteLength(payload), reusedIdleRun: true, valuesPrinted: false }; + if (dryRun) return sessionSendPlan(input.sessionId, "turn", active, request, null, idleRunSummary(idleRun)); + const command = await input.store.createCommand(idleRun.run.id, validateCreateCommand(commandBody)); + const run = await input.store.getRun(idleRun.run.id); + void emitAgentRunOtelSpan("command_created", run, process.env, { command, startTimeMs: startedAt, kind: 2, attributes: { "http.method": "POST", "http.route": "/api/v1/sessions/:sessionId/send", "http.status_code": 200, decision: "turn", reusedIdleRun: true, commandType: command.type, commandState: command.state } }); + void emitAgentRunOtelSpan("session_send", run, process.env, { command, startTimeMs: startedAt, kind: 2, attributes: { "http.method": "POST", "http.route": "/api/v1/sessions/:sessionId/send", "http.status_code": 200, decision: "turn", createRunnerJob: false, reusedIdleRun: true } }); + return sessionSendResponse({ sessionId: input.sessionId, decision: "turn", run, command, runnerJob: null, activeBefore: active, reusedIdleRun: idleRunSummary(idleRun), dryRun: false }); + } + const runRecord = asRecord(record.run ?? record.runBase ?? null, "sessionSend.run"); const runBody = sessionSendRunBody(input.sessionId, runRecord); const commandBody: JsonRecord = { type: "turn", payload }; @@ -908,7 +921,7 @@ async function sendToSession(input: { store: AgentRunStore; sessionId: string; b runnerJobBytes: createRunnerJob ? jsonByteLength(runnerJobBody) : 0, valuesPrinted: false, }; - if (dryRun) return sessionSendPlan(input.sessionId, "turn", active, request, runBody); + if (dryRun) return sessionSendPlan(input.sessionId, "turn", active, request, runBody, null); const run = await input.store.createRun(validateCreateRun(runBody)); void emitAgentRunOtelSpan("run_created", run, process.env, { startTimeMs: startedAt, kind: 2, attributes: { "http.method": "POST", "http.route": "/api/v1/sessions/:sessionId/send", "http.status_code": 200, decision: "turn", backendProfile: run.backendProfile, providerId: run.providerId } }); const command = await input.store.createCommand(run.id, validateCreateCommand(commandBody)); @@ -925,7 +938,7 @@ async function sendToSession(input: { store: AgentRunStore; sessionId: string; b void emitAgentRunOtelSpan("runner_job_created", run, process.env, { command, startTimeMs: startedAt, kind: 2, attributes: { "http.method": "POST", "http.route": "/api/v1/sessions/:sessionId/send", "http.status_code": 200, decision: "turn", jobName: stringJsonValue(runnerJobRecord?.jobName), namespace: stringJsonValue(runnerJobRecord?.namespace) } }); } void emitAgentRunOtelSpan("session_send", run, process.env, { command, startTimeMs: startedAt, kind: 2, attributes: { "http.method": "POST", "http.route": "/api/v1/sessions/:sessionId/send", "http.status_code": 200, decision: "turn", createRunnerJob } }); - return sessionSendResponse({ sessionId: input.sessionId, decision: "turn", run, command, runnerJob, activeBefore: active, dryRun: false }); + return sessionSendResponse({ sessionId: input.sessionId, decision: "turn", run, command, runnerJob, activeBefore: active, reusedIdleRun: null, dryRun: false }); } async function activeReceivableCommand(store: AgentRunStore, session: SessionRecord): Promise<{ run: RunRecord; command: CommandRecord; reason: string; leaseExpired: boolean } | null> { @@ -941,6 +954,21 @@ async function activeReceivableCommand(store: AgentRunStore, session: SessionRec return { run, command, reason: "active-turn-running", leaseExpired }; } +async function idleReceivableRun(store: AgentRunStore, session: SessionRecord): Promise<{ run: RunRecord; lastCommand: CommandRecord; reason: string; leaseExpired: boolean } | null> { + if (!session.lastRunId || !session.lastCommandId) return null; + try { + const [run, lastCommand] = await Promise.all([store.getRun(session.lastRunId), store.getCommand(session.lastCommandId)]); + if (run.sessionRef?.sessionId !== session.sessionId || lastCommand.runId !== run.id) return null; + if (runIsTerminal(run) || !commandIsTerminal(lastCommand)) return null; + const leaseExpired = run.leaseExpiresAt ? Date.parse(run.leaseExpiresAt) <= Date.now() : false; + if (leaseExpired) return null; + if (run.status !== "claimed" && run.status !== "running") return null; + return { run, lastCommand, reason: "idle-run-reusable", leaseExpired }; + } catch { + return null; + } +} + function sessionSendPayload(record: JsonRecord): JsonRecord { const payload = asJsonRecord(record.payload); if (payload) return payload; @@ -955,7 +983,7 @@ function sessionSendRunBody(sessionId: string, runRecord: JsonRecord): JsonRecor return { ...runRecord, sessionRef: { ...sessionRef, sessionId, metadata } }; } -function sessionSendPlan(sessionId: string, decision: "steer" | "turn", active: Awaited>, request: JsonRecord, runBody: JsonRecord | null): JsonRecord { +function sessionSendPlan(sessionId: string, decision: "steer" | "turn", active: Awaited>, request: JsonRecord, runBody: JsonRecord | null, reusedIdleRun: JsonRecord | null = null): JsonRecord { return { action: "session-send-plan", dryRun: true, @@ -964,6 +992,7 @@ function sessionSendPlan(sessionId: string, decision: "steer" | "turn", active: decision, internalCommandType: decision, activeBefore: active ? activeBeforeSummary(active) : null, + reusedIdleRun, request, ...(runBody ? { run: { bodyBytes: jsonByteLength(runBody), sessionRef: summarizeSendSessionRef(runBody), valuesPrinted: false } } : {}), next: { confirm: managerActionDescriptor({ action: "send-session", operation: "send", resourceKind: "session", resourceName: sessionId, sessionId, inputKind: "prompt" }), note: "Remove --dry-run to perform the mutation. Manager will decide internal steer vs turn from durable session state." }, @@ -971,7 +1000,7 @@ function sessionSendPlan(sessionId: string, decision: "steer" | "turn", active: }; } -function sessionSendResponse(input: { sessionId: string; decision: "steer" | "turn"; run: RunRecord; command: CommandRecord; runnerJob: JsonValue; activeBefore: Awaited>; dryRun: false }): JsonRecord { +function sessionSendResponse(input: { sessionId: string; decision: "steer" | "turn"; run: RunRecord; command: CommandRecord; runnerJob: JsonValue; activeBefore: Awaited>; reusedIdleRun?: JsonRecord | null; dryRun: false }): JsonRecord { return { action: "session-send", dryRun: input.dryRun, @@ -983,6 +1012,7 @@ function sessionSendResponse(input: { sessionId: string; decision: "steer" | "tu command: input.command as unknown as JsonRecord, runnerJob: input.runnerJob, activeBefore: input.activeBefore ? activeBeforeSummary(input.activeBefore) : null, + reusedIdleRun: input.reusedIdleRun ?? null, pollActions: [ managerActionDescriptor({ action: "inspect-session", operation: "describe", resourceKind: "session", resourceName: input.sessionId, sessionId: input.sessionId, readerId: "cli" }), managerActionDescriptor({ action: "poll-trace", operation: "events", resourceKind: "run", resourceName: input.run.id, runId: input.run.id, commandId: input.command.id, sessionId: input.sessionId, afterSeq: 0, limit: 100 }), @@ -1016,6 +1046,19 @@ function activeBeforeSummary(active: NonNullable>>): JsonRecord { + return { + runId: idleRun.run.id, + commandId: idleRun.lastCommand.id, + runStatus: idleRun.run.status, + commandState: idleRun.lastCommand.state, + leaseExpiresAt: idleRun.run.leaseExpiresAt, + leaseExpired: idleRun.leaseExpired, + reason: idleRun.reason, + valuesPrinted: false, + }; +} + function summarizeSendSessionRef(runBody: JsonRecord): JsonRecord { const ref = asJsonRecord(runBody.sessionRef) ?? {}; return { sessionId: optionalString(ref.sessionId), conversationId: optionalString(ref.conversationId), threadId: optionalString(ref.threadId), metadataKeys: Object.keys(asJsonRecord(ref.metadata) ?? {}).sort(), valuesPrinted: false }; diff --git a/src/mgr/store.ts b/src/mgr/store.ts index 34decc2..d60531d 100644 --- a/src/mgr/store.ts +++ b/src/mgr/store.ts @@ -23,6 +23,7 @@ export interface AgentRunStore { createRun(input: CreateRunInput): MaybePromise; getRun(runId: string): MaybePromise; listEvents(runId: string, afterSeq: number, limit: number): MaybePromise; + listEventsForCommand(runId: string, commandId: string, limit: number): MaybePromise; createCommand(runId: string, input: CreateCommandInput): MaybePromise; getCommand(commandId: string): MaybePromise; listCommands(runId: string, afterSeq: number, limit: number): MaybePromise; @@ -111,7 +112,8 @@ export async function openAgentRunStoreFromEnv(env: NodeJS.ProcessEnv = process. const databaseUrl = env.DATABASE_URL?.trim(); if (databaseUrl) { const { createPostgresAgentRunStore } = await import("./postgres-store.js"); - return createPostgresAgentRunStore({ connectionString: databaseUrl, poolMax: optionalPositiveIntegerEnv(env, "AGENTRUN_POSTGRES_POOL_MAX") }); + const poolMax = optionalPositiveIntegerEnv(env, "AGENTRUN_POSTGRES_POOL_MAX"); + return createPostgresAgentRunStore({ connectionString: databaseUrl, ...(poolMax !== undefined ? { poolMax } : {}) }); } const storeMode = env.AGENTRUN_STORE ?? env.AGENTRUN_MGR_STORE; if (storeMode === "memory") return new MemoryAgentRunStore(); @@ -167,6 +169,14 @@ export class MemoryAgentRunStore implements AgentRunStore { return (this.eventsByRun.get(runId) ?? []).filter((event) => event.seq > afterSeq).slice(0, Math.max(1, Math.min(limit, 500))); } + listEventsForCommand(runId: string, commandId: string, limit: number): RunEvent[] { + this.getRun(runId); + const clamped = Math.max(1, Math.min(limit, 2_000)); + return (this.eventsByRun.get(runId) ?? []) + .filter((event) => eventMatchesCommand(event, commandId)) + .slice(0, clamped); + } + createCommand(runId: string, input: CreateCommandInput): CommandRecord { const run = this.getRun(runId); if (isTerminalRunStatus(run.status)) throw new AgentRunError(run.failureKind ?? (run.status === "cancelled" ? "cancelled" : "schema-invalid"), `run ${runId} is already terminal: ${run.status}`, { httpStatus: 409 }); @@ -867,6 +877,11 @@ export function sessionTitleFromCommand(command: CommandRecord): string | null { return value.trim().replace(/\s+/gu, " ").slice(0, 120) || null; } +function eventMatchesCommand(event: RunEvent, commandId: string): boolean { + const payload = event.payload; + return payload.commandId === commandId || payload.targetCommandId === commandId || (event.type === "terminal_status" && payload.commandId === undefined); +} + export function isSessionOutputEvent(event: RunEvent): boolean { return event.type === "assistant_message" || event.type === "command_output" || event.type === "diff" || event.type === "error" || event.type === "terminal_status"; } diff --git a/src/runner/run-once.ts b/src/runner/run-once.ts index d9dc730..d752f38 100644 --- a/src/runner/run-once.ts +++ b/src/runner/run-once.ts @@ -131,15 +131,17 @@ export async function runOnce(options: RunnerOnceOptions): Promise { try { let idleSince = Date.now(); let firstPoll = true; + let commandPollAfterSeq = 0; while (true) { const currentRun = await api.getRun(options.runId); if (isTerminalRun(currentRun)) return { runner, claimed, terminalStatus: currentRun.terminalStatus, failureKind: currentRun.failureKind, run: currentRun, commandsProcessed: commandResults.length, commandResults, stopped: "run-terminal" } as JsonRecord; - const commandsResponse = await api.pollCommands(options.runId, { afterSeq: 0, limit: 20, ...(firstPoll && options.commandId ? { commandId: options.commandId } : {}) }); + const commandsResponse = await api.pollCommands(options.runId, { afterSeq: commandPollAfterSeq, limit: 50, ...(firstPoll && options.commandId ? { commandId: options.commandId } : {}) }); firstPoll = false; const command = commandsResponse.selected; if (!command) { await failPendingSteerWithoutActiveTurn(api, options.runId, commandsResponse.items, runner.id, attemptId, runnerLog, options.runnerJobId); + commandPollAfterSeq = Math.max(commandPollAfterSeq, lastCommandSeq(commandsResponse.items)); if (options.oneShot === true) return noPendingResult(runner, claimed, commandResults, commandsResponse.items.length, "one-shot-no-pending"); if (Date.now() - idleSince >= idleTimeoutMs) return noPendingResult(runner, claimed, commandResults, commandsResponse.items.length, "idle-timeout"); await sleep(pollIntervalMs); @@ -171,6 +173,7 @@ export async function runOnce(options: RunnerOnceOptions): Promise { ? await reportCommandFailure(api, options.runId, command.id, runner, attemptId, materializationFailure, "runner:resource-bundle", runnerLog, { terminalRun: true, ...(options.runnerJobId ? { runnerJobId: options.runnerJobId } : {}) }) : await executeCommand(api, withResourceAssembly(options, resourceEnv, initialPrompt), command, runner, attemptId, workspacePath, backendSession ?? (backendSession = createBackendSession(currentRun, withResourceAssembly(options, resourceEnv, initialPrompt))), runnerLog); commandResults.push(result); + commandPollAfterSeq = Math.max(commandPollAfterSeq, command.seq); idleSince = Date.now(); if (options.oneShot === true) { const run = await api.reportStatus(options.runId, { terminalStatus: result.terminalStatus, failureKind: result.failureKind, failureMessage: null }); @@ -317,7 +320,7 @@ async function executeCommand(api: RunnerManagerApi, options: RunnerOnceOptions, }; await appendBestEffort(api, options.runId, { type: "error", payload: retryPayload }); await appendBestEffort(api, options.runId, { type: "backend_status", payload: retryPayload }); - await runnerLog.write("command.retrying", { runId: options.runId, commandId: command.id, runnerId: runner.id, runnerJobId: options.runnerJobId ?? null, ...retryPayload, valuesPrinted: false }); + await runnerLog.write("command.retrying", { runId: options.runId, ...retryPayload, valuesPrinted: false }); if (backendSession) { const closeEvents = await backendSession.close(); for (const event of closeEvents) await appendBestEffort(api, options.runId, annotateCommandEvent(event, command.id, attemptId, runner.id)); @@ -720,6 +723,12 @@ function noPendingResult(runner: RunnerRecord, claimed: RunRecord, commandResult return { runner, claimed, terminalStatus: "blocked", failureKind: "schema-invalid", commandsProcessed: 0, commandResults, polledCommands, stopped }; } +function lastCommandSeq(commands: CommandRecord[]): number { + let seq = 0; + for (const command of commands) seq = Math.max(seq, command.seq); + return seq; +} + function sleep(ms: number): Promise { return new Promise((resolve) => setTimeout(resolve, ms)); }