328 lines
20 KiB
TypeScript
328 lines
20 KiB
TypeScript
import assert from "node:assert/strict";
|
|
import { spawn } from "node:child_process";
|
|
import { chmod, readFile, writeFile } from "node:fs/promises";
|
|
import path from "node:path";
|
|
import { startManagerServer } from "../../mgr/server.js";
|
|
import { MemoryAgentRunStore } from "../../mgr/store.js";
|
|
import { ManagerClient } from "../../mgr/client.js";
|
|
import type { JsonRecord, QueueDispatchResult, QueueTaskRecord } from "../../common/types.js";
|
|
import { assertNoSecretLeak, loadArtificerImageRef, type SelfTestCase } from "../harness.js";
|
|
|
|
const selfTest: SelfTestCase = async (context) => {
|
|
const fakeKubectl = path.join(context.tmp, "fake-kubectl-queue-q2.js");
|
|
const createdManifest = path.join(context.tmp, "created-queue-q2-runner-job.json");
|
|
await writeFile(fakeKubectl, `#!/usr/bin/env bun
|
|
const args = Bun.argv.slice(2);
|
|
async function readStdin() {
|
|
const chunks = [];
|
|
for await (const chunk of Bun.stdin.stream()) chunks.push(chunk);
|
|
return Buffer.concat(chunks.map((chunk) => Buffer.from(chunk))).toString("utf8");
|
|
}
|
|
if (args[0] === "create") {
|
|
const text = await readStdin();
|
|
const manifest = JSON.parse(text);
|
|
if (manifest.kind === "Job") await Bun.write(${JSON.stringify(createdManifest)}, text);
|
|
const uid = manifest.kind === "Job" ? "job-uid-queue-q2" : "secret-uid-queue-q2";
|
|
console.log(JSON.stringify({ apiVersion: manifest.apiVersion, kind: manifest.kind, metadata: { uid, resourceVersion: "1", name: manifest.metadata.name, namespace: manifest.metadata.namespace } }));
|
|
process.exit(0);
|
|
}
|
|
if (args[0] === "patch" && args[1] === "secret") {
|
|
console.log(JSON.stringify({ apiVersion: "v1", kind: "Secret", metadata: { uid: "secret-uid-queue-q2", resourceVersion: "2", name: args[2], namespace: args[4] } }));
|
|
process.exit(0);
|
|
}
|
|
if (args[0] === "delete" && args[1] === "secret") {
|
|
console.log(JSON.stringify({ kind: "Status", status: "Success" }));
|
|
process.exit(0);
|
|
}
|
|
console.error("unsupported fake kubectl args: " + args.join(" "));
|
|
process.exit(1);
|
|
`);
|
|
await chmod(fakeKubectl, 0o755);
|
|
const store = new MemoryAgentRunStore();
|
|
const artificerImageRef = await loadArtificerImageRef(context.root);
|
|
const aipodImageRef = {
|
|
kind: "env-image-dockerfile",
|
|
repoUrl: "git@github.com:pikasTech/agentrun.git",
|
|
commitId: artificerImageRef.commitId,
|
|
dockerfilePath: "deploy/container/Containerfile",
|
|
};
|
|
const server = await startManagerServer({
|
|
port: 0,
|
|
host: "127.0.0.1",
|
|
sourceCommit: "self-test",
|
|
store,
|
|
runnerJobDefaults: {
|
|
namespace: "agentrun-v01",
|
|
managerUrl: "http://agentrun-mgr.agentrun-v01.svc.cluster.local:8080",
|
|
image: "127.0.0.1:5000/agentrun/agentrun-mgr@sha256:1111111111111111111111111111111111111111111111111111111111111111",
|
|
envIdentity: "selftest-env-identity",
|
|
kubectlCommand: fakeKubectl,
|
|
unideskSshEndpointEnv: { name: "UNIDESK_MAIN_SERVER_IP", value: "https://unidesk.default.example.test" },
|
|
},
|
|
});
|
|
try {
|
|
const client = new ManagerClient(server.baseUrl);
|
|
const stdinSubmitPlan = await runCliJson(context, server.baseUrl, ["queue", "submit", "--json-stdin", "--dry-run", "--idempotency-key", "queue-q2-cli-stdin-dry-run"], {
|
|
tenantId: "unidesk",
|
|
projectId: "pikasTech/unidesk",
|
|
queue: "dev",
|
|
title: "stdin queue dry-run task",
|
|
payload: { prompt: "stdin queue dry-run" },
|
|
});
|
|
assert.equal(stdinSubmitPlan.ok, true);
|
|
assert.equal(((stdinSubmitPlan.data as JsonRecord).action), "queue-submit-plan");
|
|
assert.equal((((stdinSubmitPlan.data as JsonRecord).jsonInput as JsonRecord).preferred), "--json-stdin");
|
|
assert.equal(String(((stdinSubmitPlan.data as JsonRecord).next as JsonRecord).confirm).includes("--json-stdin"), true);
|
|
assert.equal(String(((stdinSubmitPlan.data as JsonRecord).next as JsonRecord).confirm).includes("--idempotency-key <idempotency-key>"), true);
|
|
assert.equal(String(((stdinSubmitPlan.data as JsonRecord).next as JsonRecord).confirm).includes("--json-file"), false);
|
|
|
|
const help = await runCliJson(context, server.baseUrl, ["help"]);
|
|
const commands = ((help.data as JsonRecord).commands as string[]) ?? [];
|
|
assert.equal(commands.some((item) => item.includes("queue submit --json-stdin|--json-file")), true);
|
|
assert.equal(commands.some((item) => item.includes("queue submit --json-file <task.json>|--json-stdin")), false);
|
|
|
|
const created = await client.post("/api/v1/queue/tasks", {
|
|
tenantId: "unidesk",
|
|
projectId: "pikasTech/unidesk",
|
|
queue: "dev",
|
|
lane: "q2",
|
|
title: "Q2 queue dispatch task",
|
|
priority: 20,
|
|
backendProfile: "codex",
|
|
providerId: "G14",
|
|
workspaceRef: { kind: "host-path", path: context.workspace },
|
|
sessionRef: { sessionId: "sess_queue_q2_dispatch_selftest", metadata: { source: "queue-q2-self-test" } },
|
|
executionPolicy: {
|
|
sandbox: "workspace-write",
|
|
approval: "never",
|
|
timeoutMs: 15_000,
|
|
network: "default",
|
|
secretScope: { allowCredentialEcho: false, providerCredentials: [{ profile: "codex", secretRef: { name: "agentrun-v01-provider-codex", keys: ["auth.json", "config.toml"], mountPath: context.codexHome } }] },
|
|
},
|
|
resourceBundleRef: null,
|
|
payload: { prompt: "queue dispatch hello" },
|
|
references: [{ kind: "issue", url: "https://github.com/pikasTech/agentrun/issues/39" }],
|
|
metadata: { source: "queue-q2-self-test", aipodImageRef },
|
|
idempotencyKey: "queue-q2-dispatch-self-test",
|
|
}) as QueueTaskRecord;
|
|
const dispatchPlan = await runCliJson(context, server.baseUrl, ["queue", "dispatch", String(created.id), "--dry-run", "--attempt-id", "attempt_queue_q2_cli_dryrun"]);
|
|
assert.equal(dispatchPlan.ok, true);
|
|
assert.equal(((dispatchPlan.data as JsonRecord).action), "queue-dispatch-plan");
|
|
assert.equal(String(((dispatchPlan.data as JsonRecord).next as JsonRecord).confirm).includes("--json-file"), false);
|
|
assert.equal(String(((dispatchPlan.data as JsonRecord).next as JsonRecord).confirm).includes("--attempt-id <attempt-id>"), true);
|
|
|
|
const dispatchStdinPlan = await runCliJson(context, server.baseUrl, ["queue", "dispatch", String(created.id), "--json-stdin", "--dry-run"], { attemptId: "attempt_queue_q2_cli_stdin_dryrun" });
|
|
assert.equal(dispatchStdinPlan.ok, true);
|
|
assert.equal(String(((dispatchStdinPlan.data as JsonRecord).next as JsonRecord).confirm).includes("--json-stdin"), true);
|
|
assert.equal(String(((dispatchStdinPlan.data as JsonRecord).next as JsonRecord).confirm).includes("--json-file"), false);
|
|
|
|
const dispatched = await client.post(`/api/v1/queue/tasks/${created.id}/dispatch`, { attemptId: "attempt_queue_q2_selftest" }) as QueueDispatchResult;
|
|
assert.equal(dispatched.action, "queue-dispatch");
|
|
assert.equal(dispatched.mutation, true);
|
|
assert.equal(((dispatched.envImage as JsonRecord).status), "runtime-default-reused");
|
|
assert.equal(((dispatched.envImage as JsonRecord).digestPinned), true);
|
|
assert.equal(((dispatched.runnerJob as JsonRecord).image), "127.0.0.1:5000/agentrun/agentrun-mgr@sha256:1111111111111111111111111111111111111111111111111111111111111111");
|
|
assert.equal(((((dispatched.runnerJob as JsonRecord).envImage as JsonRecord).imageRef as JsonRecord).dockerfilePath), "deploy/container/Containerfile");
|
|
assert.equal(((dispatched.workReady as JsonRecord).valuesPrinted), false);
|
|
assert.ok((((dispatched.workReady as JsonRecord).requiredImageTools as string[]) ?? []).includes("npm"));
|
|
assert.equal(dispatched.latestAttempt.attemptId, "attempt_queue_q2_selftest");
|
|
assert.equal(dispatched.latestAttempt.runId, dispatched.run.id);
|
|
assert.equal(dispatched.latestAttempt.commandId, dispatched.command.id);
|
|
assert.ok(dispatched.latestAttempt.runnerJobId);
|
|
assert.equal(dispatched.latestAttempt.sessionId, "sess_queue_q2_dispatch_selftest");
|
|
assert.equal(dispatched.latestAttempt.sessionPath, "/api/v1/sessions/sess_queue_q2_dispatch_selftest");
|
|
assert.equal(dispatched.task.state, "running");
|
|
assert.equal(dispatched.task.latestAttempt?.attemptId, "attempt_queue_q2_selftest");
|
|
assert.equal(dispatched.task.sessionPath, "/api/v1/sessions/sess_queue_q2_dispatch_selftest");
|
|
assert.equal(dispatched.run.sessionRef?.sessionId, "sess_queue_q2_dispatch_selftest");
|
|
|
|
const shown = await client.get(`/api/v1/queue/tasks/${created.id}`) as QueueTaskRecord;
|
|
assert.equal(shown.state, "running");
|
|
assert.equal(shown.latestAttempt?.runId, dispatched.run.id);
|
|
assert.equal(shown.latestAttempt?.commandId, dispatched.command.id);
|
|
assert.equal(shown.latestAttempt?.runnerJobId, dispatched.latestAttempt.runnerJobId);
|
|
|
|
const jobs = await client.get(`/api/v1/runs/${dispatched.run.id}/runner-jobs?commandId=${dispatched.command.id}`) as { items?: JsonRecord[]; count?: number };
|
|
assert.equal(jobs.count, 1);
|
|
assert.equal(jobs.items?.[0]?.attemptId, "attempt_queue_q2_selftest");
|
|
const events = await client.get(`/api/v1/runs/${dispatched.run.id}/events?afterSeq=0&limit=100`) as { items?: JsonRecord[] };
|
|
assert.ok(events.items?.some((item) => ((item.payload as JsonRecord).phase) === "queue-dispatched"));
|
|
await assert.rejects(
|
|
() => client.post(`/api/v1/queue/tasks/${created.id}/dispatch`, { attemptId: "attempt_queue_q2_duplicate" }),
|
|
(error) => error instanceof Error && error.message.includes("not pending"),
|
|
);
|
|
await client.patch(`/api/v1/commands/${dispatched.command.id}/status`, { terminalStatus: "completed", failureKind: null, failureMessage: null });
|
|
const autoShown = await client.get(`/api/v1/queue/tasks/${created.id}`) as QueueTaskRecord;
|
|
assert.equal(autoShown.state, "completed");
|
|
assert.equal(autoShown.latestAttempt?.state, "completed");
|
|
assert.equal(autoShown.latestAttempt?.runId, dispatched.run.id);
|
|
const commander = await client.get("/api/v1/queue/commander?queue=dev&readerId=queue-q2-self-test") as { items?: QueueTaskRecord[]; stats?: JsonRecord };
|
|
const commanderTask = commander.items?.find((item) => item.id === created.id);
|
|
assert.equal(commanderTask?.state, "completed");
|
|
assert.equal(commanderTask?.latestAttempt?.state, "completed");
|
|
assert.equal(((commander.stats?.byState as JsonRecord).completed), 1);
|
|
const refreshed = await client.post(`/api/v1/queue/tasks/${created.id}/refresh`, {}) as QueueTaskRecord;
|
|
assert.equal(refreshed.state, "completed");
|
|
assert.equal(refreshed.latestAttempt?.state, "completed");
|
|
assert.equal(refreshed.latestAttempt?.runId, dispatched.run.id);
|
|
assert.equal(refreshed.latestAttempt?.sessionPath, "/api/v1/sessions/sess_queue_q2_dispatch_selftest");
|
|
const dispatchManifest = JSON.parse(await readFile(createdManifest, "utf8")) as JsonRecord;
|
|
assert.ok(JSON.stringify(dispatchManifest).includes(dispatched.run.id));
|
|
|
|
const unideskCreated = await client.post("/api/v1/queue/tasks", {
|
|
tenantId: "unidesk",
|
|
projectId: "pikasTech/unidesk",
|
|
queue: "dev",
|
|
lane: "q2",
|
|
title: "Q2 queue unidesk ssh dispatch task",
|
|
priority: 22,
|
|
backendProfile: "codex",
|
|
providerId: "G14",
|
|
workspaceRef: { kind: "host-path", path: context.workspace },
|
|
sessionRef: { sessionId: "sess_queue_q2_unidesk_ssh_selftest", metadata: { source: "queue-q2-unidesk-ssh-self-test" } },
|
|
executionPolicy: {
|
|
sandbox: "workspace-write",
|
|
approval: "never",
|
|
timeoutMs: 15_000,
|
|
network: "default",
|
|
secretScope: {
|
|
allowCredentialEcho: false,
|
|
providerCredentials: [{ profile: "codex", secretRef: { name: "agentrun-v01-provider-codex", keys: ["auth.json", "config.toml"], mountPath: context.codexHome } }],
|
|
toolCredentials: [{
|
|
tool: "unidesk-ssh",
|
|
purpose: "ssh-passthrough-readonly",
|
|
secretRef: { name: "agentrun-v01-tool-unidesk-ssh", keys: ["UNIDESK_SSH_CLIENT_TOKEN"] },
|
|
projection: { kind: "env", envName: "UNIDESK_SSH_CLIENT_TOKEN", secretKey: "UNIDESK_SSH_CLIENT_TOKEN" },
|
|
}],
|
|
},
|
|
},
|
|
resourceBundleRef: null,
|
|
payload: { prompt: "queue unidesk ssh dispatch hello" },
|
|
references: [{ kind: "issue", url: "https://github.com/pikasTech/agentrun/issues/112" }],
|
|
metadata: { source: "queue-q2-unidesk-ssh-self-test" },
|
|
idempotencyKey: "queue-q2-unidesk-ssh-self-test",
|
|
}) as QueueTaskRecord;
|
|
const unideskDispatched = await client.post(`/api/v1/queue/tasks/${unideskCreated.id}/dispatch`, { attemptId: "attempt_queue_q2_unidesk_ssh_selftest" }) as QueueDispatchResult;
|
|
assert.deepEqual((((unideskDispatched.runnerJob as JsonRecord).transientEnv as JsonRecord).names) as string[], ["UNIDESK_MAIN_SERVER_IP"]);
|
|
assert.equal((((unideskDispatched.runnerJob as JsonRecord).transientEnvSecret as JsonRecord).valuesPrinted), false);
|
|
const unideskManifest = JSON.parse(await readFile(createdManifest, "utf8")) as JsonRecord;
|
|
assert.equal(runnerEnvValue(unideskManifest, "UNIDESK_MAIN_SERVER_IP"), "secretRef");
|
|
assert.equal(runnerEnvValue(unideskManifest, "UNIDESK_SSH_CLIENT_TOKEN"), "secretRef");
|
|
assertNoSecretLeak(unideskDispatched);
|
|
|
|
const blockedCreated = await client.post("/api/v1/queue/tasks", {
|
|
tenantId: "hwlab",
|
|
projectId: "pikasTech/HWLAB",
|
|
queue: "dev",
|
|
lane: "q2",
|
|
title: "Q2 queue blocked task",
|
|
priority: 23,
|
|
backendProfile: "codex",
|
|
providerId: "G14",
|
|
workspaceRef: { kind: "host-path", path: context.workspace },
|
|
sessionRef: { sessionId: "sess_queue_q2_blocked_selftest", metadata: { source: "queue-q2-blocked-self-test" } },
|
|
executionPolicy: {
|
|
sandbox: "workspace-write",
|
|
approval: "never",
|
|
timeoutMs: 15_000,
|
|
network: "default",
|
|
secretScope: { allowCredentialEcho: false, providerCredentials: [{ profile: "codex", secretRef: { name: "agentrun-v01-provider-codex", keys: ["auth.json", "config.toml"], mountPath: context.codexHome } }] },
|
|
},
|
|
resourceBundleRef: null,
|
|
payload: { prompt: "queue blocked hello" },
|
|
references: [{ kind: "issue", url: "https://github.com/pikasTech/agentrun/issues/130" }],
|
|
metadata: { source: "queue-q2-blocked-self-test" },
|
|
idempotencyKey: "queue-q2-blocked-self-test",
|
|
}) as QueueTaskRecord;
|
|
const blockedDispatched = await client.post(`/api/v1/queue/tasks/${blockedCreated.id}/dispatch`, { attemptId: "attempt_queue_q2_blocked_selftest" }) as QueueDispatchResult;
|
|
await client.patch(`/api/v1/commands/${blockedDispatched.command.id}/status`, { terminalStatus: "blocked", failureKind: "required-skill-unavailable", failureMessage: "missing required skill" });
|
|
await client.patch(`/api/v1/runs/${blockedDispatched.run.id}/status`, { terminalStatus: "blocked", failureKind: "required-skill-unavailable", failureMessage: "missing required skill" });
|
|
const blockedShown = await client.get(`/api/v1/queue/tasks/${blockedCreated.id}`) as QueueTaskRecord;
|
|
assert.equal(blockedShown.state, "blocked");
|
|
assert.equal(blockedShown.latestAttempt?.state, "blocked");
|
|
|
|
const cancelCreated = await client.post("/api/v1/queue/tasks", {
|
|
tenantId: "unidesk",
|
|
projectId: "pikasTech/unidesk",
|
|
queue: "dev",
|
|
lane: "q2",
|
|
title: "Q2 queue cancel task",
|
|
priority: 21,
|
|
backendProfile: "codex",
|
|
providerId: "G14",
|
|
workspaceRef: { kind: "host-path", path: context.workspace },
|
|
sessionRef: { sessionId: "sess_queue_q2_cancel_selftest", metadata: { source: "queue-q2-cancel-self-test" } },
|
|
executionPolicy: {
|
|
sandbox: "workspace-write",
|
|
approval: "never",
|
|
timeoutMs: 15_000,
|
|
network: "default",
|
|
secretScope: { allowCredentialEcho: false, providerCredentials: [{ profile: "codex", secretRef: { name: "agentrun-v01-provider-codex", keys: ["auth.json", "config.toml"], mountPath: context.codexHome } }] },
|
|
},
|
|
resourceBundleRef: null,
|
|
payload: { prompt: "queue cancel hello" },
|
|
references: [{ kind: "issue", url: "https://github.com/pikasTech/agentrun/issues/105" }],
|
|
metadata: { source: "queue-q2-cancel-self-test" },
|
|
idempotencyKey: "queue-q2-cancel-self-test",
|
|
}) as QueueTaskRecord;
|
|
const cancelDispatched = await client.post(`/api/v1/queue/tasks/${cancelCreated.id}/dispatch`, { attemptId: "attempt_queue_q2_cancel_selftest" }) as QueueDispatchResult;
|
|
const cancelled = await client.post(`/api/v1/queue/tasks/${cancelCreated.id}/cancel`, { reason: "self-test queue cancel propagation" }) as QueueTaskRecord;
|
|
assert.equal(cancelled.state, "cancelled");
|
|
assert.equal(cancelled.latestAttempt?.state, "cancelled");
|
|
assert.equal(cancelled.latestAttempt?.runId, cancelDispatched.run.id);
|
|
assert.equal(cancelled.cancelReason, "self-test queue cancel propagation");
|
|
const cancelledCommand = await client.get(`/api/v1/runs/${cancelDispatched.run.id}/commands/${cancelDispatched.command.id}`) as { state?: string };
|
|
assert.equal(cancelledCommand.state, "cancelled");
|
|
const cancelledRun = await client.get(`/api/v1/runs/${cancelDispatched.run.id}`) as { status?: string; terminalStatus?: string; failureKind?: string };
|
|
assert.equal(cancelledRun.status, "cancelled");
|
|
assert.equal(cancelledRun.terminalStatus, "cancelled");
|
|
assert.equal(cancelledRun.failureKind, "cancelled");
|
|
const cancelledSession = await client.get("/api/v1/sessions/sess_queue_q2_cancel_selftest") as { executionState?: string; terminalStatus?: string; failureKind?: string; activeRunId?: string | null; activeCommandId?: string | null };
|
|
assert.equal(cancelledSession.executionState, "terminal");
|
|
assert.equal(cancelledSession.terminalStatus, "cancelled");
|
|
assert.equal(cancelledSession.failureKind, "cancelled");
|
|
assert.equal(cancelledSession.activeRunId, null);
|
|
assert.equal(cancelledSession.activeCommandId, null);
|
|
const cancelManifest = JSON.parse(await readFile(createdManifest, "utf8")) as JsonRecord;
|
|
assert.ok(JSON.stringify(cancelManifest).includes(cancelDispatched.run.id));
|
|
assertNoSecretLeak(dispatched);
|
|
assertNoSecretLeak(cancelled);
|
|
return { name: "queue-q2-dispatch", tests: ["queue-cli-json-stdin-dry-run", "queue-dispatch-run-command-runner-job", "queue-read-views-refresh-terminal-state", "queue-refresh-from-core-status", "queue-dispatch-no-repeat", "queue-unidesk-ssh-endpoint-auto-env", "queue-blocked-run-state-wins-over-command-failed", "queue-cancel-propagates-to-run-command-session"] };
|
|
} finally {
|
|
await new Promise<void>((resolve) => server.server.close(() => resolve()));
|
|
}
|
|
};
|
|
|
|
export default selfTest;
|
|
|
|
async function runCliJson(context: { root: string }, managerUrl: string, args: string[], stdinJson?: JsonRecord): Promise<JsonRecord> {
|
|
const proc = spawn(process.execPath, [`${context.root}/scripts/agentrun-cli.ts`, "--manager-url", managerUrl, ...args], { stdio: ["pipe", "pipe", "pipe"] });
|
|
if (stdinJson !== undefined) proc.stdin.write(JSON.stringify(stdinJson));
|
|
proc.stdin.end();
|
|
const [stdout, stderr, code] = await Promise.all([readStream(proc.stdout), readStream(proc.stderr), new Promise<number | null>((resolve) => proc.on("close", resolve))]);
|
|
assert.equal(code, 0, stderr || stdout);
|
|
return JSON.parse(stdout) as JsonRecord;
|
|
}
|
|
|
|
async function readStream(stream: NodeJS.ReadableStream): Promise<string> {
|
|
const chunks: Buffer[] = [];
|
|
stream.on("data", (chunk: Buffer | string) => chunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk)));
|
|
await new Promise<void>((resolve, reject) => {
|
|
stream.on("end", resolve);
|
|
stream.on("error", reject);
|
|
});
|
|
return Buffer.concat(chunks).toString("utf8");
|
|
}
|
|
|
|
function runnerEnvValue(manifest: JsonRecord, name: string): unknown {
|
|
const spec = manifest.spec as JsonRecord;
|
|
const template = spec.template as JsonRecord;
|
|
const podSpec = template.spec as JsonRecord;
|
|
const containers = podSpec.containers as JsonRecord[];
|
|
const env = containers[0]?.env as JsonRecord[];
|
|
const item = env.find((entry) => entry.name === name);
|
|
if (!item) return undefined;
|
|
if (item.valueFrom) return "secretRef";
|
|
return item.value;
|
|
}
|