Files
pikasTech-agentrun/src/selftest/cases/75-queue-q2-dispatch.ts
T
2026-06-10 11:10:19 +08:00

264 lines
15 KiB
TypeScript

import assert from "node:assert/strict";
import { chmod, readFile, writeFile } from "node:fs/promises";
import path from "node:path";
import { startManagerServer } from "../../mgr/server.js";
import { MemoryAgentRunStore } from "../../mgr/store.js";
import { ManagerClient } from "../../mgr/client.js";
import type { JsonRecord, QueueDispatchResult, QueueTaskRecord } from "../../common/types.js";
import { assertNoSecretLeak, type SelfTestCase } from "../harness.js";
const selfTest: SelfTestCase = async (context) => {
const fakeKubectl = path.join(context.tmp, "fake-kubectl-queue-q2.js");
const createdManifest = path.join(context.tmp, "created-queue-q2-runner-job.json");
await writeFile(fakeKubectl, `#!/usr/bin/env bun
const args = Bun.argv.slice(2);
async function readStdin() {
const chunks = [];
for await (const chunk of Bun.stdin.stream()) chunks.push(chunk);
return Buffer.concat(chunks.map((chunk) => Buffer.from(chunk))).toString("utf8");
}
if (args[0] === "create") {
const text = await readStdin();
const manifest = JSON.parse(text);
if (manifest.kind === "Job") await Bun.write(${JSON.stringify(createdManifest)}, text);
const uid = manifest.kind === "Job" ? "job-uid-queue-q2" : "secret-uid-queue-q2";
console.log(JSON.stringify({ apiVersion: manifest.apiVersion, kind: manifest.kind, metadata: { uid, resourceVersion: "1", name: manifest.metadata.name, namespace: manifest.metadata.namespace } }));
process.exit(0);
}
if (args[0] === "patch" && args[1] === "secret") {
console.log(JSON.stringify({ apiVersion: "v1", kind: "Secret", metadata: { uid: "secret-uid-queue-q2", resourceVersion: "2", name: args[2], namespace: args[4] } }));
process.exit(0);
}
if (args[0] === "delete" && args[1] === "secret") {
console.log(JSON.stringify({ kind: "Status", status: "Success" }));
process.exit(0);
}
console.error("unsupported fake kubectl args: " + args.join(" "));
process.exit(1);
`);
await chmod(fakeKubectl, 0o755);
const store = new MemoryAgentRunStore();
const server = await startManagerServer({
port: 0,
host: "127.0.0.1",
sourceCommit: "self-test",
store,
runnerJobDefaults: {
namespace: "agentrun-v01",
managerUrl: "http://agentrun-mgr.agentrun-v01.svc.cluster.local:8080",
image: "127.0.0.1:5000/agentrun/agentrun-mgr@sha256:1111111111111111111111111111111111111111111111111111111111111111",
kubectlCommand: fakeKubectl,
unideskSshEndpointEnv: { name: "UNIDESK_MAIN_SERVER_IP", value: "https://unidesk.default.example.test" },
},
});
try {
const client = new ManagerClient(server.baseUrl);
const created = await client.post("/api/v1/queue/tasks", {
tenantId: "unidesk",
projectId: "pikasTech/unidesk",
queue: "dev",
lane: "q2",
title: "Q2 queue dispatch task",
priority: 20,
backendProfile: "codex",
providerId: "G14",
workspaceRef: { kind: "host-path", path: context.workspace },
sessionRef: { sessionId: "sess_queue_q2_dispatch_selftest", metadata: { source: "queue-q2-self-test" } },
executionPolicy: {
sandbox: "workspace-write",
approval: "never",
timeoutMs: 15_000,
network: "default",
secretScope: { allowCredentialEcho: false, providerCredentials: [{ profile: "codex", secretRef: { name: "agentrun-v01-provider-codex", keys: ["auth.json", "config.toml"], mountPath: context.codexHome } }] },
},
resourceBundleRef: null,
payload: { prompt: "queue dispatch hello" },
references: [{ kind: "issue", url: "https://github.com/pikasTech/agentrun/issues/39" }],
metadata: { source: "queue-q2-self-test" },
idempotencyKey: "queue-q2-dispatch-self-test",
}) as QueueTaskRecord;
const dispatched = await client.post(`/api/v1/queue/tasks/${created.id}/dispatch`, { attemptId: "attempt_queue_q2_selftest" }) as QueueDispatchResult;
assert.equal(dispatched.action, "queue-dispatch");
assert.equal(dispatched.mutation, true);
assert.equal(dispatched.latestAttempt.attemptId, "attempt_queue_q2_selftest");
assert.equal(dispatched.latestAttempt.runId, dispatched.run.id);
assert.equal(dispatched.latestAttempt.commandId, dispatched.command.id);
assert.ok(dispatched.latestAttempt.runnerJobId);
assert.equal(dispatched.latestAttempt.sessionId, "sess_queue_q2_dispatch_selftest");
assert.equal(dispatched.latestAttempt.sessionPath, "/api/v1/sessions/sess_queue_q2_dispatch_selftest");
assert.equal(dispatched.task.state, "running");
assert.equal(dispatched.task.latestAttempt?.attemptId, "attempt_queue_q2_selftest");
assert.equal(dispatched.task.sessionPath, "/api/v1/sessions/sess_queue_q2_dispatch_selftest");
assert.equal(dispatched.run.sessionRef?.sessionId, "sess_queue_q2_dispatch_selftest");
const shown = await client.get(`/api/v1/queue/tasks/${created.id}`) as QueueTaskRecord;
assert.equal(shown.state, "running");
assert.equal(shown.latestAttempt?.runId, dispatched.run.id);
assert.equal(shown.latestAttempt?.commandId, dispatched.command.id);
assert.equal(shown.latestAttempt?.runnerJobId, dispatched.latestAttempt.runnerJobId);
const jobs = await client.get(`/api/v1/runs/${dispatched.run.id}/runner-jobs?commandId=${dispatched.command.id}`) as { items?: JsonRecord[]; count?: number };
assert.equal(jobs.count, 1);
assert.equal(jobs.items?.[0]?.attemptId, "attempt_queue_q2_selftest");
const events = await client.get(`/api/v1/runs/${dispatched.run.id}/events?afterSeq=0&limit=100`) as { items?: JsonRecord[] };
assert.ok(events.items?.some((item) => ((item.payload as JsonRecord).phase) === "queue-dispatched"));
await assert.rejects(
() => client.post(`/api/v1/queue/tasks/${created.id}/dispatch`, { attemptId: "attempt_queue_q2_duplicate" }),
(error) => error instanceof Error && error.message.includes("not pending"),
);
await client.patch(`/api/v1/commands/${dispatched.command.id}/status`, { terminalStatus: "completed", failureKind: null, failureMessage: null });
const autoShown = await client.get(`/api/v1/queue/tasks/${created.id}`) as QueueTaskRecord;
assert.equal(autoShown.state, "completed");
assert.equal(autoShown.latestAttempt?.state, "completed");
assert.equal(autoShown.latestAttempt?.runId, dispatched.run.id);
const commander = await client.get("/api/v1/queue/commander?queue=dev&readerId=queue-q2-self-test") as { items?: QueueTaskRecord[]; stats?: JsonRecord };
const commanderTask = commander.items?.find((item) => item.id === created.id);
assert.equal(commanderTask?.state, "completed");
assert.equal(commanderTask?.latestAttempt?.state, "completed");
assert.equal(((commander.stats?.byState as JsonRecord).completed), 1);
const refreshed = await client.post(`/api/v1/queue/tasks/${created.id}/refresh`, {}) as QueueTaskRecord;
assert.equal(refreshed.state, "completed");
assert.equal(refreshed.latestAttempt?.state, "completed");
assert.equal(refreshed.latestAttempt?.runId, dispatched.run.id);
assert.equal(refreshed.latestAttempt?.sessionPath, "/api/v1/sessions/sess_queue_q2_dispatch_selftest");
const dispatchManifest = JSON.parse(await readFile(createdManifest, "utf8")) as JsonRecord;
assert.ok(JSON.stringify(dispatchManifest).includes(dispatched.run.id));
const unideskCreated = await client.post("/api/v1/queue/tasks", {
tenantId: "unidesk",
projectId: "pikasTech/unidesk",
queue: "dev",
lane: "q2",
title: "Q2 queue unidesk ssh dispatch task",
priority: 22,
backendProfile: "codex",
providerId: "G14",
workspaceRef: { kind: "host-path", path: context.workspace },
sessionRef: { sessionId: "sess_queue_q2_unidesk_ssh_selftest", metadata: { source: "queue-q2-unidesk-ssh-self-test" } },
executionPolicy: {
sandbox: "workspace-write",
approval: "never",
timeoutMs: 15_000,
network: "default",
secretScope: {
allowCredentialEcho: false,
providerCredentials: [{ profile: "codex", secretRef: { name: "agentrun-v01-provider-codex", keys: ["auth.json", "config.toml"], mountPath: context.codexHome } }],
toolCredentials: [{
tool: "unidesk-ssh",
purpose: "ssh-passthrough-readonly",
secretRef: { name: "agentrun-v01-tool-unidesk-ssh", keys: ["UNIDESK_SSH_CLIENT_TOKEN"] },
projection: { kind: "env", envName: "UNIDESK_SSH_CLIENT_TOKEN", secretKey: "UNIDESK_SSH_CLIENT_TOKEN" },
}],
},
},
resourceBundleRef: null,
payload: { prompt: "queue unidesk ssh dispatch hello" },
references: [{ kind: "issue", url: "https://github.com/pikasTech/agentrun/issues/112" }],
metadata: { source: "queue-q2-unidesk-ssh-self-test" },
idempotencyKey: "queue-q2-unidesk-ssh-self-test",
}) as QueueTaskRecord;
const unideskDispatched = await client.post(`/api/v1/queue/tasks/${unideskCreated.id}/dispatch`, { attemptId: "attempt_queue_q2_unidesk_ssh_selftest" }) as QueueDispatchResult;
assert.deepEqual((((unideskDispatched.runnerJob as JsonRecord).transientEnv as JsonRecord).names) as string[], ["UNIDESK_MAIN_SERVER_IP"]);
assert.equal((((unideskDispatched.runnerJob as JsonRecord).transientEnvSecret as JsonRecord).valuesPrinted), false);
const unideskManifest = JSON.parse(await readFile(createdManifest, "utf8")) as JsonRecord;
assert.equal(runnerEnvValue(unideskManifest, "UNIDESK_MAIN_SERVER_IP"), "secretRef");
assert.equal(runnerEnvValue(unideskManifest, "UNIDESK_SSH_CLIENT_TOKEN"), "secretRef");
assertNoSecretLeak(unideskDispatched);
const blockedCreated = await client.post("/api/v1/queue/tasks", {
tenantId: "hwlab",
projectId: "pikasTech/HWLAB",
queue: "dev",
lane: "q2",
title: "Q2 queue blocked task",
priority: 23,
backendProfile: "codex",
providerId: "G14",
workspaceRef: { kind: "host-path", path: context.workspace },
sessionRef: { sessionId: "sess_queue_q2_blocked_selftest", metadata: { source: "queue-q2-blocked-self-test" } },
executionPolicy: {
sandbox: "workspace-write",
approval: "never",
timeoutMs: 15_000,
network: "default",
secretScope: { allowCredentialEcho: false, providerCredentials: [{ profile: "codex", secretRef: { name: "agentrun-v01-provider-codex", keys: ["auth.json", "config.toml"], mountPath: context.codexHome } }] },
},
resourceBundleRef: null,
payload: { prompt: "queue blocked hello" },
references: [{ kind: "issue", url: "https://github.com/pikasTech/agentrun/issues/130" }],
metadata: { source: "queue-q2-blocked-self-test" },
idempotencyKey: "queue-q2-blocked-self-test",
}) as QueueTaskRecord;
const blockedDispatched = await client.post(`/api/v1/queue/tasks/${blockedCreated.id}/dispatch`, { attemptId: "attempt_queue_q2_blocked_selftest" }) as QueueDispatchResult;
await client.patch(`/api/v1/commands/${blockedDispatched.command.id}/status`, { terminalStatus: "blocked", failureKind: "required-skill-unavailable", failureMessage: "missing required skill" });
await client.patch(`/api/v1/runs/${blockedDispatched.run.id}/status`, { terminalStatus: "blocked", failureKind: "required-skill-unavailable", failureMessage: "missing required skill" });
const blockedShown = await client.get(`/api/v1/queue/tasks/${blockedCreated.id}`) as QueueTaskRecord;
assert.equal(blockedShown.state, "blocked");
assert.equal(blockedShown.latestAttempt?.state, "blocked");
const cancelCreated = await client.post("/api/v1/queue/tasks", {
tenantId: "unidesk",
projectId: "pikasTech/unidesk",
queue: "dev",
lane: "q2",
title: "Q2 queue cancel task",
priority: 21,
backendProfile: "codex",
providerId: "G14",
workspaceRef: { kind: "host-path", path: context.workspace },
sessionRef: { sessionId: "sess_queue_q2_cancel_selftest", metadata: { source: "queue-q2-cancel-self-test" } },
executionPolicy: {
sandbox: "workspace-write",
approval: "never",
timeoutMs: 15_000,
network: "default",
secretScope: { allowCredentialEcho: false, providerCredentials: [{ profile: "codex", secretRef: { name: "agentrun-v01-provider-codex", keys: ["auth.json", "config.toml"], mountPath: context.codexHome } }] },
},
resourceBundleRef: null,
payload: { prompt: "queue cancel hello" },
references: [{ kind: "issue", url: "https://github.com/pikasTech/agentrun/issues/105" }],
metadata: { source: "queue-q2-cancel-self-test" },
idempotencyKey: "queue-q2-cancel-self-test",
}) as QueueTaskRecord;
const cancelDispatched = await client.post(`/api/v1/queue/tasks/${cancelCreated.id}/dispatch`, { attemptId: "attempt_queue_q2_cancel_selftest" }) as QueueDispatchResult;
const cancelled = await client.post(`/api/v1/queue/tasks/${cancelCreated.id}/cancel`, { reason: "self-test queue cancel propagation" }) as QueueTaskRecord;
assert.equal(cancelled.state, "cancelled");
assert.equal(cancelled.latestAttempt?.state, "cancelled");
assert.equal(cancelled.latestAttempt?.runId, cancelDispatched.run.id);
assert.equal(cancelled.cancelReason, "self-test queue cancel propagation");
const cancelledCommand = await client.get(`/api/v1/runs/${cancelDispatched.run.id}/commands/${cancelDispatched.command.id}`) as { state?: string };
assert.equal(cancelledCommand.state, "cancelled");
const cancelledRun = await client.get(`/api/v1/runs/${cancelDispatched.run.id}`) as { status?: string; terminalStatus?: string; failureKind?: string };
assert.equal(cancelledRun.status, "cancelled");
assert.equal(cancelledRun.terminalStatus, "cancelled");
assert.equal(cancelledRun.failureKind, "cancelled");
const cancelledSession = await client.get("/api/v1/sessions/sess_queue_q2_cancel_selftest") as { executionState?: string; terminalStatus?: string; failureKind?: string; activeRunId?: string | null; activeCommandId?: string | null };
assert.equal(cancelledSession.executionState, "terminal");
assert.equal(cancelledSession.terminalStatus, "cancelled");
assert.equal(cancelledSession.failureKind, "cancelled");
assert.equal(cancelledSession.activeRunId, null);
assert.equal(cancelledSession.activeCommandId, null);
const cancelManifest = JSON.parse(await readFile(createdManifest, "utf8")) as JsonRecord;
assert.ok(JSON.stringify(cancelManifest).includes(cancelDispatched.run.id));
assertNoSecretLeak(dispatched);
assertNoSecretLeak(cancelled);
return { name: "queue-q2-dispatch", tests: ["queue-dispatch-run-command-runner-job", "queue-read-views-refresh-terminal-state", "queue-refresh-from-core-status", "queue-dispatch-no-repeat", "queue-unidesk-ssh-endpoint-auto-env", "queue-blocked-run-state-wins-over-command-failed", "queue-cancel-propagates-to-run-command-session"] };
} finally {
await new Promise<void>((resolve) => server.server.close(() => resolve()));
}
};
export default selfTest;
function runnerEnvValue(manifest: JsonRecord, name: string): unknown {
const spec = manifest.spec as JsonRecord;
const template = spec.template as JsonRecord;
const podSpec = template.spec as JsonRecord;
const containers = podSpec.containers as JsonRecord[];
const env = containers[0]?.env as JsonRecord[];
const item = env.find((entry) => entry.name === name);
if (!item) return undefined;
if (item.valueFrom) return "secretRef";
return item.value;
}