Merge pull request #227 from pikasTech/fix-v02-session-runner-reuse-20260622
修复 v0.2 session runner 复用与状态误报
This commit is contained in:
@@ -477,6 +477,23 @@ CREATE TABLE IF NOT EXISTS agentrun_schema_migrations (
|
||||
return result.rows.map(eventFromRow);
|
||||
}
|
||||
|
||||
async listEventsForCommand(runId: string, commandId: string, limit: number): Promise<RunEvent[]> {
|
||||
await this.getRun(runId);
|
||||
const result = await this.pool.query(
|
||||
`SELECT * FROM agentrun_events
|
||||
WHERE run_id = $1
|
||||
AND (
|
||||
payload->>'commandId' = $2
|
||||
OR payload->>'targetCommandId' = $2
|
||||
OR (type = 'terminal_status' AND NOT (payload ? 'commandId'))
|
||||
)
|
||||
ORDER BY seq ASC
|
||||
LIMIT $3`,
|
||||
[runId, commandId, clamp(limit, 1, 2_000)],
|
||||
);
|
||||
return result.rows.map(eventFromRow);
|
||||
}
|
||||
|
||||
async createCommand(runId: string, input: CreateCommandInput): Promise<CommandRecord> {
|
||||
const payloadHash = stableHash(input.payload);
|
||||
return this.withTransaction(async (client) => {
|
||||
|
||||
@@ -79,7 +79,7 @@ export async function reconcileRunnerJobsOnce(input: RunnerReconcilerOptions): P
|
||||
if (terminalOutbox) observation = mergeTerminalOutboxObservation(observation, terminalOutbox);
|
||||
await input.store.updateRunnerJobResult(job.id, { observation });
|
||||
if (stringValue(observation.category) === "runner-job-observe-failed") observeFailedCount++;
|
||||
const runClosure = await closeOpenRunWhenCommandTerminal(input.store, job);
|
||||
const runClosure = await closeOpenRunWhenCommandTerminal(input.store, job, observation);
|
||||
if (runClosure.closed === true) runClosureCount++;
|
||||
items.push({
|
||||
runnerJobId: job.id,
|
||||
@@ -295,7 +295,7 @@ function observationFromObjects(job: RunnerJobRecord, namespace: string, observe
|
||||
};
|
||||
}
|
||||
|
||||
async function closeOpenRunWhenCommandTerminal(store: AgentRunStore, job: RunnerJobRecord): Promise<JsonRecord> {
|
||||
async function closeOpenRunWhenCommandTerminal(store: AgentRunStore, job: RunnerJobRecord, observation: JsonRecord): Promise<JsonRecord> {
|
||||
let command: CommandRecord;
|
||||
try {
|
||||
command = await store.getCommand(job.commandId);
|
||||
@@ -303,6 +303,8 @@ async function closeOpenRunWhenCommandTerminal(store: AgentRunStore, job: Runner
|
||||
return { closed: false, state: "command-missing", valuesPrinted: false };
|
||||
}
|
||||
if (!isTerminalCommandState(command.state)) return { closed: false, state: "command-open", commandState: command.state, valuesPrinted: false };
|
||||
const observedRunnerPhase = stringValue(observation.observedRunnerPhase) ?? stringValue(observation.phase);
|
||||
if (!runnerJobAllowsRunClosure(observedRunnerPhase)) return { closed: false, state: "runner-job-still-active", observedRunnerPhase: observedRunnerPhase ?? "unknown", commandState: command.state, valuesPrinted: false };
|
||||
const run = await store.getRun(job.runId);
|
||||
if (isTerminalRunStatus(run.status)) return { closed: false, state: "run-terminal", runStatus: run.status, commandState: command.state, valuesPrinted: false };
|
||||
const terminalStatus = terminalStatusFromCommand(command);
|
||||
@@ -312,6 +314,10 @@ async function closeOpenRunWhenCommandTerminal(store: AgentRunStore, job: Runner
|
||||
return { closed: true, state: "closed-open-run", runStatus: next.status, terminalStatus, commandState: command.state, valuesPrinted: false };
|
||||
}
|
||||
|
||||
function runnerJobAllowsRunClosure(phase: string | null): boolean {
|
||||
return phase === "k8s:succeeded" || phase === "k8s:failed" || phase === "k8s:missing";
|
||||
}
|
||||
|
||||
function terminalStatusFromCommand(command: CommandRecord): TerminalStatus {
|
||||
if (command.state === "completed") return "completed";
|
||||
if (command.state === "cancelled") return "cancelled";
|
||||
|
||||
+51
-8
@@ -436,7 +436,7 @@ function compactRecoveryActions(value: JsonValue | undefined): JsonValue[] {
|
||||
}
|
||||
|
||||
function commandResultOtelAttributes(result: JsonValue): JsonRecord {
|
||||
const record = asJsonRecord(result);
|
||||
const record = asJsonRecord(result) ?? {};
|
||||
const terminalClassification = asJsonRecord(record.terminalClassification);
|
||||
const diagnosis = asJsonRecord(record.diagnosis);
|
||||
const runnerJob = asJsonRecord(diagnosis?.runnerJob);
|
||||
@@ -771,8 +771,8 @@ async function route({ method, url, body, store, sourceCommit, authSummary, runn
|
||||
const runId = runnerJobMatch[1] ?? "";
|
||||
const commandId = url.searchParams.get("commandId") ?? undefined;
|
||||
const jobs = await store.listRunnerJobs(runId, commandId);
|
||||
const events = await store.listEvents(runId, 0, 500);
|
||||
return { items: jobs.map((job) => runnerJobStatusSummary(job, events)), count: jobs.length, lastSeq: events.at(-1)?.seq ?? 0 };
|
||||
const items = await Promise.all(jobs.map(async (job) => runnerJobStatusSummary(job, await store.listEventsForCommand(runId, job.commandId, 2_000))));
|
||||
return { items, count: jobs.length };
|
||||
}
|
||||
const runnerJobShowMatch = path.match(/^\/api\/v1\/runs\/([^/]+)\/runner-jobs\/([^/]+)$/u);
|
||||
if (method === "GET" && runnerJobShowMatch) {
|
||||
@@ -781,7 +781,7 @@ async function route({ method, url, body, store, sourceCommit, authSummary, runn
|
||||
const jobs = await store.listRunnerJobs(runId);
|
||||
const job = jobs.find((item) => item.id === runnerJobId);
|
||||
if (!job) throw new AgentRunError("schema-invalid", `runner job ${runnerJobId} was not found`, { httpStatus: 404 });
|
||||
return runnerJobStatusSummary(job, await store.listEvents(runId, 0, 500)) as JsonValue;
|
||||
return runnerJobStatusSummary(job, await store.listEventsForCommand(runId, job.commandId, 2_000)) as JsonValue;
|
||||
}
|
||||
if (method === "POST" && path === "/api/v1/reconciler/runner-jobs") {
|
||||
const record = body === null ? {} : asRecord(body, "runnerReconciler");
|
||||
@@ -892,6 +892,19 @@ async function sendToSession(input: { store: AgentRunStore; sessionId: string; b
|
||||
return sessionSendResponse({ sessionId: input.sessionId, decision: "steer", run: active.run, command, runnerJob: null, activeBefore: active, dryRun: false });
|
||||
}
|
||||
|
||||
const idleRun = existing ? await idleReceivableRun(input.store, existing) : null;
|
||||
if (idleRun) {
|
||||
const commandBody: JsonRecord = { type: "turn", payload };
|
||||
if (commandIdempotencyKey) commandBody.idempotencyKey = commandIdempotencyKey;
|
||||
const request = { method: "POST", path: `/api/v1/runs/${idleRun.run.id}/commands`, commandType: "turn", payloadBytes: jsonByteLength(payload), reusedIdleRun: true, valuesPrinted: false };
|
||||
if (dryRun) return sessionSendPlan(input.sessionId, "turn", active, request, null, idleRunSummary(idleRun));
|
||||
const command = await input.store.createCommand(idleRun.run.id, validateCreateCommand(commandBody));
|
||||
const run = await input.store.getRun(idleRun.run.id);
|
||||
void emitAgentRunOtelSpan("command_created", run, process.env, { command, startTimeMs: startedAt, kind: 2, attributes: { "http.method": "POST", "http.route": "/api/v1/sessions/:sessionId/send", "http.status_code": 200, decision: "turn", reusedIdleRun: true, commandType: command.type, commandState: command.state } });
|
||||
void emitAgentRunOtelSpan("session_send", run, process.env, { command, startTimeMs: startedAt, kind: 2, attributes: { "http.method": "POST", "http.route": "/api/v1/sessions/:sessionId/send", "http.status_code": 200, decision: "turn", createRunnerJob: false, reusedIdleRun: true } });
|
||||
return sessionSendResponse({ sessionId: input.sessionId, decision: "turn", run, command, runnerJob: null, activeBefore: active, reusedIdleRun: idleRunSummary(idleRun), dryRun: false });
|
||||
}
|
||||
|
||||
const runRecord = asRecord(record.run ?? record.runBase ?? null, "sessionSend.run");
|
||||
const runBody = sessionSendRunBody(input.sessionId, runRecord);
|
||||
const commandBody: JsonRecord = { type: "turn", payload };
|
||||
@@ -908,7 +921,7 @@ async function sendToSession(input: { store: AgentRunStore; sessionId: string; b
|
||||
runnerJobBytes: createRunnerJob ? jsonByteLength(runnerJobBody) : 0,
|
||||
valuesPrinted: false,
|
||||
};
|
||||
if (dryRun) return sessionSendPlan(input.sessionId, "turn", active, request, runBody);
|
||||
if (dryRun) return sessionSendPlan(input.sessionId, "turn", active, request, runBody, null);
|
||||
const run = await input.store.createRun(validateCreateRun(runBody));
|
||||
void emitAgentRunOtelSpan("run_created", run, process.env, { startTimeMs: startedAt, kind: 2, attributes: { "http.method": "POST", "http.route": "/api/v1/sessions/:sessionId/send", "http.status_code": 200, decision: "turn", backendProfile: run.backendProfile, providerId: run.providerId } });
|
||||
const command = await input.store.createCommand(run.id, validateCreateCommand(commandBody));
|
||||
@@ -925,7 +938,7 @@ async function sendToSession(input: { store: AgentRunStore; sessionId: string; b
|
||||
void emitAgentRunOtelSpan("runner_job_created", run, process.env, { command, startTimeMs: startedAt, kind: 2, attributes: { "http.method": "POST", "http.route": "/api/v1/sessions/:sessionId/send", "http.status_code": 200, decision: "turn", jobName: stringJsonValue(runnerJobRecord?.jobName), namespace: stringJsonValue(runnerJobRecord?.namespace) } });
|
||||
}
|
||||
void emitAgentRunOtelSpan("session_send", run, process.env, { command, startTimeMs: startedAt, kind: 2, attributes: { "http.method": "POST", "http.route": "/api/v1/sessions/:sessionId/send", "http.status_code": 200, decision: "turn", createRunnerJob } });
|
||||
return sessionSendResponse({ sessionId: input.sessionId, decision: "turn", run, command, runnerJob, activeBefore: active, dryRun: false });
|
||||
return sessionSendResponse({ sessionId: input.sessionId, decision: "turn", run, command, runnerJob, activeBefore: active, reusedIdleRun: null, dryRun: false });
|
||||
}
|
||||
|
||||
async function activeReceivableCommand(store: AgentRunStore, session: SessionRecord): Promise<{ run: RunRecord; command: CommandRecord; reason: string; leaseExpired: boolean } | null> {
|
||||
@@ -941,6 +954,21 @@ async function activeReceivableCommand(store: AgentRunStore, session: SessionRec
|
||||
return { run, command, reason: "active-turn-running", leaseExpired };
|
||||
}
|
||||
|
||||
async function idleReceivableRun(store: AgentRunStore, session: SessionRecord): Promise<{ run: RunRecord; lastCommand: CommandRecord; reason: string; leaseExpired: boolean } | null> {
|
||||
if (!session.lastRunId || !session.lastCommandId) return null;
|
||||
try {
|
||||
const [run, lastCommand] = await Promise.all([store.getRun(session.lastRunId), store.getCommand(session.lastCommandId)]);
|
||||
if (run.sessionRef?.sessionId !== session.sessionId || lastCommand.runId !== run.id) return null;
|
||||
if (runIsTerminal(run) || !commandIsTerminal(lastCommand)) return null;
|
||||
const leaseExpired = run.leaseExpiresAt ? Date.parse(run.leaseExpiresAt) <= Date.now() : false;
|
||||
if (leaseExpired) return null;
|
||||
if (run.status !== "claimed" && run.status !== "running") return null;
|
||||
return { run, lastCommand, reason: "idle-run-reusable", leaseExpired };
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
function sessionSendPayload(record: JsonRecord): JsonRecord {
|
||||
const payload = asJsonRecord(record.payload);
|
||||
if (payload) return payload;
|
||||
@@ -955,7 +983,7 @@ function sessionSendRunBody(sessionId: string, runRecord: JsonRecord): JsonRecor
|
||||
return { ...runRecord, sessionRef: { ...sessionRef, sessionId, metadata } };
|
||||
}
|
||||
|
||||
function sessionSendPlan(sessionId: string, decision: "steer" | "turn", active: Awaited<ReturnType<typeof activeReceivableCommand>>, request: JsonRecord, runBody: JsonRecord | null): JsonRecord {
|
||||
function sessionSendPlan(sessionId: string, decision: "steer" | "turn", active: Awaited<ReturnType<typeof activeReceivableCommand>>, request: JsonRecord, runBody: JsonRecord | null, reusedIdleRun: JsonRecord | null = null): JsonRecord {
|
||||
return {
|
||||
action: "session-send-plan",
|
||||
dryRun: true,
|
||||
@@ -964,6 +992,7 @@ function sessionSendPlan(sessionId: string, decision: "steer" | "turn", active:
|
||||
decision,
|
||||
internalCommandType: decision,
|
||||
activeBefore: active ? activeBeforeSummary(active) : null,
|
||||
reusedIdleRun,
|
||||
request,
|
||||
...(runBody ? { run: { bodyBytes: jsonByteLength(runBody), sessionRef: summarizeSendSessionRef(runBody), valuesPrinted: false } } : {}),
|
||||
next: { confirm: managerActionDescriptor({ action: "send-session", operation: "send", resourceKind: "session", resourceName: sessionId, sessionId, inputKind: "prompt" }), note: "Remove --dry-run to perform the mutation. Manager will decide internal steer vs turn from durable session state." },
|
||||
@@ -971,7 +1000,7 @@ function sessionSendPlan(sessionId: string, decision: "steer" | "turn", active:
|
||||
};
|
||||
}
|
||||
|
||||
function sessionSendResponse(input: { sessionId: string; decision: "steer" | "turn"; run: RunRecord; command: CommandRecord; runnerJob: JsonValue; activeBefore: Awaited<ReturnType<typeof activeReceivableCommand>>; dryRun: false }): JsonRecord {
|
||||
function sessionSendResponse(input: { sessionId: string; decision: "steer" | "turn"; run: RunRecord; command: CommandRecord; runnerJob: JsonValue; activeBefore: Awaited<ReturnType<typeof activeReceivableCommand>>; reusedIdleRun?: JsonRecord | null; dryRun: false }): JsonRecord {
|
||||
return {
|
||||
action: "session-send",
|
||||
dryRun: input.dryRun,
|
||||
@@ -983,6 +1012,7 @@ function sessionSendResponse(input: { sessionId: string; decision: "steer" | "tu
|
||||
command: input.command as unknown as JsonRecord,
|
||||
runnerJob: input.runnerJob,
|
||||
activeBefore: input.activeBefore ? activeBeforeSummary(input.activeBefore) : null,
|
||||
reusedIdleRun: input.reusedIdleRun ?? null,
|
||||
pollActions: [
|
||||
managerActionDescriptor({ action: "inspect-session", operation: "describe", resourceKind: "session", resourceName: input.sessionId, sessionId: input.sessionId, readerId: "cli" }),
|
||||
managerActionDescriptor({ action: "poll-trace", operation: "events", resourceKind: "run", resourceName: input.run.id, runId: input.run.id, commandId: input.command.id, sessionId: input.sessionId, afterSeq: 0, limit: 100 }),
|
||||
@@ -1016,6 +1046,19 @@ function activeBeforeSummary(active: NonNullable<Awaited<ReturnType<typeof activ
|
||||
return { runId: active.run.id, commandId: active.command.id, commandState: active.command.state, runStatus: active.run.status, leaseExpiresAt: active.run.leaseExpiresAt, leaseExpired: active.leaseExpired, reason: active.reason, valuesPrinted: false };
|
||||
}
|
||||
|
||||
function idleRunSummary(idleRun: NonNullable<Awaited<ReturnType<typeof idleReceivableRun>>>): JsonRecord {
|
||||
return {
|
||||
runId: idleRun.run.id,
|
||||
commandId: idleRun.lastCommand.id,
|
||||
runStatus: idleRun.run.status,
|
||||
commandState: idleRun.lastCommand.state,
|
||||
leaseExpiresAt: idleRun.run.leaseExpiresAt,
|
||||
leaseExpired: idleRun.leaseExpired,
|
||||
reason: idleRun.reason,
|
||||
valuesPrinted: false,
|
||||
};
|
||||
}
|
||||
|
||||
function summarizeSendSessionRef(runBody: JsonRecord): JsonRecord {
|
||||
const ref = asJsonRecord(runBody.sessionRef) ?? {};
|
||||
return { sessionId: optionalString(ref.sessionId), conversationId: optionalString(ref.conversationId), threadId: optionalString(ref.threadId), metadataKeys: Object.keys(asJsonRecord(ref.metadata) ?? {}).sort(), valuesPrinted: false };
|
||||
|
||||
+16
-1
@@ -23,6 +23,7 @@ export interface AgentRunStore {
|
||||
createRun(input: CreateRunInput): MaybePromise<RunRecord>;
|
||||
getRun(runId: string): MaybePromise<RunRecord>;
|
||||
listEvents(runId: string, afterSeq: number, limit: number): MaybePromise<RunEvent[]>;
|
||||
listEventsForCommand(runId: string, commandId: string, limit: number): MaybePromise<RunEvent[]>;
|
||||
createCommand(runId: string, input: CreateCommandInput): MaybePromise<CommandRecord>;
|
||||
getCommand(commandId: string): MaybePromise<CommandRecord>;
|
||||
listCommands(runId: string, afterSeq: number, limit: number): MaybePromise<CommandRecord[]>;
|
||||
@@ -111,7 +112,8 @@ export async function openAgentRunStoreFromEnv(env: NodeJS.ProcessEnv = process.
|
||||
const databaseUrl = env.DATABASE_URL?.trim();
|
||||
if (databaseUrl) {
|
||||
const { createPostgresAgentRunStore } = await import("./postgres-store.js");
|
||||
return createPostgresAgentRunStore({ connectionString: databaseUrl, poolMax: optionalPositiveIntegerEnv(env, "AGENTRUN_POSTGRES_POOL_MAX") });
|
||||
const poolMax = optionalPositiveIntegerEnv(env, "AGENTRUN_POSTGRES_POOL_MAX");
|
||||
return createPostgresAgentRunStore({ connectionString: databaseUrl, ...(poolMax !== undefined ? { poolMax } : {}) });
|
||||
}
|
||||
const storeMode = env.AGENTRUN_STORE ?? env.AGENTRUN_MGR_STORE;
|
||||
if (storeMode === "memory") return new MemoryAgentRunStore();
|
||||
@@ -167,6 +169,14 @@ export class MemoryAgentRunStore implements AgentRunStore {
|
||||
return (this.eventsByRun.get(runId) ?? []).filter((event) => event.seq > afterSeq).slice(0, Math.max(1, Math.min(limit, 500)));
|
||||
}
|
||||
|
||||
listEventsForCommand(runId: string, commandId: string, limit: number): RunEvent[] {
|
||||
this.getRun(runId);
|
||||
const clamped = Math.max(1, Math.min(limit, 2_000));
|
||||
return (this.eventsByRun.get(runId) ?? [])
|
||||
.filter((event) => eventMatchesCommand(event, commandId))
|
||||
.slice(0, clamped);
|
||||
}
|
||||
|
||||
createCommand(runId: string, input: CreateCommandInput): CommandRecord {
|
||||
const run = this.getRun(runId);
|
||||
if (isTerminalRunStatus(run.status)) throw new AgentRunError(run.failureKind ?? (run.status === "cancelled" ? "cancelled" : "schema-invalid"), `run ${runId} is already terminal: ${run.status}`, { httpStatus: 409 });
|
||||
@@ -867,6 +877,11 @@ export function sessionTitleFromCommand(command: CommandRecord): string | null {
|
||||
return value.trim().replace(/\s+/gu, " ").slice(0, 120) || null;
|
||||
}
|
||||
|
||||
function eventMatchesCommand(event: RunEvent, commandId: string): boolean {
|
||||
const payload = event.payload;
|
||||
return payload.commandId === commandId || payload.targetCommandId === commandId || (event.type === "terminal_status" && payload.commandId === undefined);
|
||||
}
|
||||
|
||||
export function isSessionOutputEvent(event: RunEvent): boolean {
|
||||
return event.type === "assistant_message" || event.type === "command_output" || event.type === "diff" || event.type === "error" || event.type === "terminal_status";
|
||||
}
|
||||
|
||||
+11
-2
@@ -131,15 +131,17 @@ export async function runOnce(options: RunnerOnceOptions): Promise<JsonRecord> {
|
||||
try {
|
||||
let idleSince = Date.now();
|
||||
let firstPoll = true;
|
||||
let commandPollAfterSeq = 0;
|
||||
while (true) {
|
||||
const currentRun = await api.getRun(options.runId);
|
||||
if (isTerminalRun(currentRun)) return { runner, claimed, terminalStatus: currentRun.terminalStatus, failureKind: currentRun.failureKind, run: currentRun, commandsProcessed: commandResults.length, commandResults, stopped: "run-terminal" } as JsonRecord;
|
||||
|
||||
const commandsResponse = await api.pollCommands(options.runId, { afterSeq: 0, limit: 20, ...(firstPoll && options.commandId ? { commandId: options.commandId } : {}) });
|
||||
const commandsResponse = await api.pollCommands(options.runId, { afterSeq: commandPollAfterSeq, limit: 50, ...(firstPoll && options.commandId ? { commandId: options.commandId } : {}) });
|
||||
firstPoll = false;
|
||||
const command = commandsResponse.selected;
|
||||
if (!command) {
|
||||
await failPendingSteerWithoutActiveTurn(api, options.runId, commandsResponse.items, runner.id, attemptId, runnerLog, options.runnerJobId);
|
||||
commandPollAfterSeq = Math.max(commandPollAfterSeq, lastCommandSeq(commandsResponse.items));
|
||||
if (options.oneShot === true) return noPendingResult(runner, claimed, commandResults, commandsResponse.items.length, "one-shot-no-pending");
|
||||
if (Date.now() - idleSince >= idleTimeoutMs) return noPendingResult(runner, claimed, commandResults, commandsResponse.items.length, "idle-timeout");
|
||||
await sleep(pollIntervalMs);
|
||||
@@ -171,6 +173,7 @@ export async function runOnce(options: RunnerOnceOptions): Promise<JsonRecord> {
|
||||
? await reportCommandFailure(api, options.runId, command.id, runner, attemptId, materializationFailure, "runner:resource-bundle", runnerLog, { terminalRun: true, ...(options.runnerJobId ? { runnerJobId: options.runnerJobId } : {}) })
|
||||
: await executeCommand(api, withResourceAssembly(options, resourceEnv, initialPrompt), command, runner, attemptId, workspacePath, backendSession ?? (backendSession = createBackendSession(currentRun, withResourceAssembly(options, resourceEnv, initialPrompt))), runnerLog);
|
||||
commandResults.push(result);
|
||||
commandPollAfterSeq = Math.max(commandPollAfterSeq, command.seq);
|
||||
idleSince = Date.now();
|
||||
if (options.oneShot === true) {
|
||||
const run = await api.reportStatus(options.runId, { terminalStatus: result.terminalStatus, failureKind: result.failureKind, failureMessage: null });
|
||||
@@ -317,7 +320,7 @@ async function executeCommand(api: RunnerManagerApi, options: RunnerOnceOptions,
|
||||
};
|
||||
await appendBestEffort(api, options.runId, { type: "error", payload: retryPayload });
|
||||
await appendBestEffort(api, options.runId, { type: "backend_status", payload: retryPayload });
|
||||
await runnerLog.write("command.retrying", { runId: options.runId, commandId: command.id, runnerId: runner.id, runnerJobId: options.runnerJobId ?? null, ...retryPayload, valuesPrinted: false });
|
||||
await runnerLog.write("command.retrying", { runId: options.runId, ...retryPayload, valuesPrinted: false });
|
||||
if (backendSession) {
|
||||
const closeEvents = await backendSession.close();
|
||||
for (const event of closeEvents) await appendBestEffort(api, options.runId, annotateCommandEvent(event, command.id, attemptId, runner.id));
|
||||
@@ -720,6 +723,12 @@ function noPendingResult(runner: RunnerRecord, claimed: RunRecord, commandResult
|
||||
return { runner, claimed, terminalStatus: "blocked", failureKind: "schema-invalid", commandsProcessed: 0, commandResults, polledCommands, stopped };
|
||||
}
|
||||
|
||||
function lastCommandSeq(commands: CommandRecord[]): number {
|
||||
let seq = 0;
|
||||
for (const command of commands) seq = Math.max(seq, command.seq);
|
||||
return seq;
|
||||
}
|
||||
|
||||
function sleep(ms: number): Promise<void> {
|
||||
return new Promise((resolve) => setTimeout(resolve, ms));
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user