Merge pull request #110 from pikasTech/fix/issue105-queue-cancel-run

修复 Queue cancel 未收敛 run 终态
This commit is contained in:
Lyon
2026-06-09 02:13:07 +08:00
committed by GitHub
3 changed files with 9 additions and 5 deletions
+2 -2
View File
@@ -985,8 +985,8 @@ CREATE TABLE IF NOT EXISTS agentrun_schema_migrations (
async cancelQueueTask(taskId: string, reason = "cancel requested"): Promise<QueueTaskRecord> {
const task = await this.getQueueTask(taskId);
if (isTerminalQueueTaskState(task.state)) return task;
if (task.latestAttempt?.commandId) await this.cancelCommand(task.latestAttempt.commandId, reason);
else if (task.latestAttempt?.runId) await this.cancelRun(task.latestAttempt.runId, reason);
if (task.latestAttempt?.runId) await this.cancelRun(task.latestAttempt.runId, reason);
else if (task.latestAttempt?.commandId) await this.cancelCommand(task.latestAttempt.commandId, reason);
return this.withTransaction(async (client) => {
const existing = await client.query("SELECT * FROM agentrun_queue_tasks WHERE id = $1 FOR UPDATE", [taskId]);
const row = existing.rows[0];
+2 -2
View File
@@ -495,8 +495,8 @@ export class MemoryAgentRunStore implements AgentRunStore {
cancelQueueTask(taskId: string, reason = "cancel requested"): QueueTaskRecord {
const task = this.getQueueTask(taskId);
if (isTerminalQueueTaskState(task.state)) return task;
if (task.latestAttempt?.commandId) this.cancelCommand(task.latestAttempt.commandId, reason);
else if (task.latestAttempt?.runId) this.cancelRun(task.latestAttempt.runId, reason);
if (task.latestAttempt?.runId) this.cancelRun(task.latestAttempt.runId, reason);
else if (task.latestAttempt?.commandId) this.cancelCommand(task.latestAttempt.commandId, reason);
const at = nowIso();
const latestAttempt = task.latestAttempt ? { ...task.latestAttempt, state: "cancelled" as const } : null;
const next: QueueTaskRecord = { ...task, state: "cancelled", latestAttempt, version: this.nextQueueVersion(), updatedAt: at, cancelledAt: at, cancelReason: reason };
+5 -1
View File
@@ -128,6 +128,10 @@ console.log(JSON.stringify({ apiVersion: manifest.apiVersion, kind: manifest.kin
assert.equal(cancelled.cancelReason, "self-test queue cancel propagation");
const cancelledCommand = await client.get(`/api/v1/runs/${cancelDispatched.run.id}/commands/${cancelDispatched.command.id}`) as { state?: string };
assert.equal(cancelledCommand.state, "cancelled");
const cancelledRun = await client.get(`/api/v1/runs/${cancelDispatched.run.id}`) as { status?: string; terminalStatus?: string; failureKind?: string };
assert.equal(cancelledRun.status, "cancelled");
assert.equal(cancelledRun.terminalStatus, "cancelled");
assert.equal(cancelledRun.failureKind, "cancelled");
const cancelledSession = await client.get("/api/v1/sessions/sess_queue_q2_cancel_selftest") as { executionState?: string; terminalStatus?: string; failureKind?: string; activeRunId?: string | null; activeCommandId?: string | null };
assert.equal(cancelledSession.executionState, "terminal");
assert.equal(cancelledSession.terminalStatus, "cancelled");
@@ -138,7 +142,7 @@ console.log(JSON.stringify({ apiVersion: manifest.apiVersion, kind: manifest.kin
assert.ok(JSON.stringify(cancelManifest).includes(cancelDispatched.run.id));
assertNoSecretLeak(dispatched);
assertNoSecretLeak(cancelled);
return { name: "queue-q2-dispatch", tests: ["queue-dispatch-run-command-runner-job", "queue-refresh-from-core-status", "queue-dispatch-no-repeat", "queue-cancel-propagates-to-command-session"] };
return { name: "queue-q2-dispatch", tests: ["queue-dispatch-run-command-runner-job", "queue-refresh-from-core-status", "queue-dispatch-no-repeat", "queue-cancel-propagates-to-run-command-session"] };
} finally {
await new Promise<void>((resolve) => server.server.close(() => resolve()));
}