fix(code-queue): add workdirs contract checks
This commit is contained in:
@@ -19,6 +19,7 @@ Each commit CI run performs:
|
||||
- `git clone` and checkout of the requested repository revision.
|
||||
- `bun install --frozen-lockfile` at the repo root and `src/`, because `bun scripts/cli.ts check` compiles all `src/components` and needs the component workspace lockfile for frontend React dependencies.
|
||||
- `UNIDESK_D601_RUST_CHECK=1 bun scripts/cli.ts check --full --rust`, so Rust backend-core is checked only inside the D601 CI execution boundary.
|
||||
- `code-queue-mgr` `/api/workdirs` contract smoke against a temporary PostgreSQL instance, asserting the stable manager route returns `ok: true` and includes the controlled `/workspace` option.
|
||||
- Backend-core production artifact publication when enabled: build the requested pushed commit on D601, stamp Docker image labels, and push a commit-pinned image to the D601 loopback artifact registry.
|
||||
- Temporary `code-queue-ci-read` Deployment and ClusterIP Service in `unidesk-ci`.
|
||||
- Code Queue read performance checks against the production PostgreSQL through `d601-tcp-egress-gateway`.
|
||||
@@ -66,7 +67,7 @@ The artifact registry contract and CD consumption path are defined in `docs/refe
|
||||
|
||||
`ci run-dev-e2e` is the manual dev desired-state smoke flow. The single authoritative reference for its Git-controlled runner script, short launcher, result directory and no-CD boundary is `docs/reference/dev-ci-runner.md`.
|
||||
|
||||
The current dev namespace e2e is a harness and smoke gate, not a full frontend/backend stack rollout. It does include a controlled Code Queue slice: D601 builds or reuses the `environments.dev.services[].id=code-queue` commit, imports the image into native k3s containerd, starts temporary PostgreSQL plus Code Queue scheduler/read/write Services in `unidesk-ci-e2e-<runId>`, and verifies the HTTP API through the Kubernetes API service proxy. This remains CI-only and must not deploy persistent `unidesk-dev` or production resources.
|
||||
The current dev namespace e2e is a harness and smoke gate, not a full frontend/backend stack rollout. It does include a controlled Code Queue slice: D601 builds or reuses the `environments.dev.services[].id=code-queue` commit, imports the image into native k3s containerd, starts temporary PostgreSQL plus Code Queue scheduler/read/write Services in `unidesk-ci-e2e-<runId>`, and verifies the HTTP API through the Kubernetes API service proxy. The stable frontend/backend path `/api/microservices/code-queue/proxy/api/workdirs` is covered by the normal UniDesk e2e check `microservice:code-queue-workdirs`, while the CI repo check separately starts `code-queue-mgr` against temporary PostgreSQL and asserts its `/api/workdirs` contract. This remains CI-only and must not deploy persistent `unidesk-dev` or production resources.
|
||||
|
||||
## Performance Gate
|
||||
|
||||
|
||||
@@ -102,7 +102,7 @@ scripts/ci/dev-e2e.sh \
|
||||
|
||||
The current script creates a Tekton `PipelineRun` for `pipeline/unidesk-dev-namespace-e2e`, stores the generated PipelineRun name in `pipelinerun.txt`, and writes a final `result.json` with `ok`, `status`, `runId`, `manifestCommit`, `pipelineRun`, `temporaryNamespace` and `finishedAt`.
|
||||
|
||||
The Tekton task creates a temporary namespace `unidesk-ci-e2e-<runId>` and may create only CI-owned smoke resources there: `postgres-dev`, `code-queue-scheduler-dev`, `code-queue-read-dev`, `code-queue-write-dev`, their ClusterIP Services and a per-run Secret/ConfigMap. It must not mutate `unidesk` or persistent `unidesk-dev`. Code Queue API validation must use ClusterIP Services and the Kubernetes API `services/.../proxy` subresource; NodePort, D601 host ports and direct public service exposure are forbidden. The smoke currently proves `/health`, `/live` and `/api/workdirs` GET/POST/DELETE on read/write/scheduler roles, giving follow-up Code Queue API fixes a reproducible CI target before production rollout.
|
||||
The Tekton task creates a temporary namespace `unidesk-ci-e2e-<runId>` and may create only CI-owned smoke resources there: `postgres-dev`, `code-queue-scheduler-dev`, `code-queue-read-dev`, `code-queue-write-dev`, their ClusterIP Services and a per-run Secret/ConfigMap. It must not mutate `unidesk` or persistent `unidesk-dev`. Code Queue API validation must use ClusterIP Services and the Kubernetes API `services/.../proxy` subresource; NodePort, D601 host ports and direct public service exposure are forbidden. The smoke currently proves `/health`, `/live` and `/api/workdirs` GET/POST/DELETE on read/write/scheduler roles, giving follow-up Code Queue API fixes a reproducible CI target before production rollout. The stable frontend/backend proxy contract is checked by `microservice:code-queue-workdirs` in the normal UniDesk e2e harness and by the repo-check `code-queue-mgr` `/api/workdirs` smoke.
|
||||
|
||||
## Commands
|
||||
|
||||
|
||||
+1
-1
@@ -110,7 +110,7 @@ function numberOption(args: string[], name: string, fallback: number): number {
|
||||
|
||||
function requireRevision(value: string | null): string {
|
||||
if (value === null || value.length === 0) throw new Error("ci run requires --revision <commit-or-ref>");
|
||||
if (!/^[A-Za-z0-9._/@:-]{1,160}$/u.test(value)) throw new Error("ci --revision contains unsupported characters");
|
||||
if (!/^[A-Za-z0-9._/@:-]{1,160}$/u.test(value) || value.startsWith("-") || value.includes("..")) throw new Error("ci --revision contains unsupported characters");
|
||||
return value;
|
||||
}
|
||||
|
||||
|
||||
+30
-1
@@ -93,6 +93,7 @@ const SERVICE_CHECK_NAMES = [
|
||||
"microservice:todo-note-write-path",
|
||||
"microservice:code-queue-status",
|
||||
"microservice:code-queue-health",
|
||||
"microservice:code-queue-workdirs",
|
||||
"microservice:code-queue-tasks",
|
||||
"microservice:decision-center-status",
|
||||
"microservice:decision-center-health",
|
||||
@@ -137,6 +138,7 @@ const FRONTEND_CHECK_NAMES = [
|
||||
"frontend:oa-event-flow-visible",
|
||||
"frontend:decision-center-visible",
|
||||
"frontend:code-queue-integrated-visible",
|
||||
"frontend:code-queue-workdirs-loaded",
|
||||
"frontend:code-queue-enqueue-await-smoke",
|
||||
"frontend:code-queue-summary-mobile-wrap",
|
||||
"frontend:code-queue-long-prompt-observation",
|
||||
@@ -1235,6 +1237,9 @@ async function serviceChecks(config: UniDeskConfig, urls: PublicUrls, checks: E2
|
||||
const k3sctlControlPlane = dockerCoreJson("/api/microservices/k3sctl-adapter/proxy/api/control-plane");
|
||||
const codeQueueStatus = dockerCoreJson("/api/microservices/code-queue/status");
|
||||
const codeQueueHealth = dockerCoreJson("/api/microservices/code-queue/health");
|
||||
const codeQueueWorkdirs = wantsCheck(options, "microservice:code-queue-workdirs")
|
||||
? dockerCoreJson("/api/microservices/code-queue/proxy/api/workdirs")
|
||||
: null;
|
||||
const codeQueueTasks = dockerCoreJson("/api/microservices/code-queue/proxy/api/tasks/overview?limit=5&transcriptLimit=1&compact=1&afterSeq=0&preferId=");
|
||||
const decisionCenterStatus = dockerCoreJson("/api/microservices/decision-center/status");
|
||||
const decisionCenterHealth = dockerCoreJson("/api/microservices/decision-center/health");
|
||||
@@ -1313,6 +1318,7 @@ async function serviceChecks(config: UniDeskConfig, urls: PublicUrls, checks: E2
|
||||
const oaEventFlowPipelineEventsBody = (oaEventFlowPipelineEvents as { body?: { ok?: boolean; events?: Array<{ tags?: unknown[]; sourceId?: string; type?: string; payload?: { runId?: string; pipelineId?: string } }>; returned?: number } }).body;
|
||||
const oaEventFlowStatsBody = (oaEventFlowStats as { body?: { ok?: boolean; stats?: unknown[]; returned?: number } }).body;
|
||||
const codeQueueHealthBody = (codeQueueHealth as { body?: { ok?: boolean; egressProxy?: { connected?: boolean }; queue?: { defaultModel?: string; judgeConfigured?: boolean; modelReasoningEfforts?: Record<string, string> } } }).body;
|
||||
const codeQueueWorkdirsBody = (codeQueueWorkdirs as { body?: { ok?: boolean; workdirs?: Array<{ path?: string; providerId?: string; executionMode?: string }>; defaultProviderId?: string; defaultWorkdir?: string } } | null)?.body;
|
||||
const codeQueueTasksBody = (codeQueueTasks as { body?: { ok?: boolean; queue?: { defaultModel?: string; modelReasoningEfforts?: Record<string, string> }; tasks?: unknown[] } }).body;
|
||||
const decisionCenterHealthBody = (decisionCenterHealth as { body?: { ok?: boolean; service?: string; storage?: string; schemaReady?: boolean; recordCount?: number; deploy?: { commit?: string } } }).body;
|
||||
const decisionCenterRecordsBody = (decisionCenterRecords as { body?: { ok?: boolean; records?: unknown[]; returned?: number } }).body;
|
||||
@@ -1500,6 +1506,14 @@ async function serviceChecks(config: UniDeskConfig, urls: PublicUrls, checks: E2
|
||||
addSelectedCheck(checks, options, "microservice:oa-event-flow-stats", (oaEventFlowStats as { ok?: boolean }).ok === true && oaEventFlowStatsBody?.ok === true && Array.isArray(oaEventFlowStatsBody.stats), oaEventFlowStats);
|
||||
addSelectedCheck(checks, options, "microservice:code-queue-status", (codeQueueStatus as { ok?: boolean }).ok === true && (codeQueueStatus as { body?: { microservice?: { id?: string; providerId?: string } } }).body?.microservice?.providerId === "D601", codeQueueStatus);
|
||||
addSelectedCheck(checks, options, "microservice:code-queue-health", (codeQueueHealth as { ok?: boolean }).ok === true && codeQueueHealthBody?.ok === true && codeQueueHealthBody.egressProxy?.connected === true && codeQueueHealthBody.queue?.defaultModel === "gpt-5.5" && codeQueueHealthBody.queue?.modelReasoningEfforts?.["gpt-5.5"] === "xhigh", codeQueueHealth);
|
||||
addSelectedCheck(checks, options, "microservice:code-queue-workdirs",
|
||||
(codeQueueWorkdirs as { ok?: boolean; status?: number } | null)?.ok === true
|
||||
&& (codeQueueWorkdirs as { status?: number } | null)?.status === 200
|
||||
&& codeQueueWorkdirsBody?.ok === true
|
||||
&& Array.isArray(codeQueueWorkdirsBody.workdirs)
|
||||
&& codeQueueWorkdirsBody.workdirs.some((workdir) => workdir?.path === "/workspace")
|
||||
&& codeQueueWorkdirsBody.workdirs.every((workdir) => typeof workdir?.path === "string" && typeof workdir?.providerId === "string" && typeof workdir?.executionMode === "string"),
|
||||
codeQueueWorkdirs);
|
||||
addSelectedCheck(checks, options, "microservice:code-queue-tasks", (codeQueueTasks as { ok?: boolean }).ok === true && codeQueueTasksBody?.ok === true && Array.isArray(codeQueueTasksBody.tasks) && codeQueueTasksBody.queue?.defaultModel === "gpt-5.5" && codeQueueTasksBody.queue?.modelReasoningEfforts?.["gpt-5.5"] === "xhigh", codeQueueTasks);
|
||||
addSelectedCheck(checks, options, "microservice:decision-center-status", (decisionCenterStatus as { ok?: boolean }).ok === true && (decisionCenterStatus as { body?: { microservice?: { id?: string; providerId?: string } } }).body?.microservice?.providerId === "D601", decisionCenterStatus);
|
||||
addSelectedCheck(checks, options, "microservice:decision-center-health", (decisionCenterHealth as { ok?: boolean }).ok === true && decisionCenterHealthBody?.ok === true && decisionCenterHealthBody.service === "decision-center" && decisionCenterHealthBody.storage === "postgres" && decisionCenterHealthBody.schemaReady === true, decisionCenterHealth);
|
||||
@@ -1606,6 +1620,7 @@ async function frontendCheck(config: UniDeskConfig, urls: PublicUrls, checks: E2
|
||||
const needDecisionCenter = wants("frontend:decision-center-visible");
|
||||
const needCodeQueue = wantsAny([
|
||||
"frontend:code-queue-integrated-visible",
|
||||
"frontend:code-queue-workdirs-loaded",
|
||||
"frontend:code-queue-enqueue-await-smoke",
|
||||
"frontend:code-queue-summary-mobile-wrap",
|
||||
"frontend:code-queue-initial-prompt-full-expand",
|
||||
@@ -1701,7 +1716,7 @@ async function frontendCheck(config: UniDeskConfig, urls: PublicUrls, checks: E2
|
||||
let codeQueueTaskCount = 0;
|
||||
let codeQueueOptions: string[] = [];
|
||||
let codeQueueSwitchMetrics: any = { optionCount: 0, switched: false };
|
||||
let codeQueueSubmitQueueControl: any = { tagName: "", createButtonVisible: false, oldInputMissing: false };
|
||||
let codeQueueSubmitQueueControl: any = { tagName: "", createButtonVisible: false, oldInputMissing: false, workdirSelectOptionTexts: [] };
|
||||
let codeQueueTracePlacement: any = { firstChildIsTrace: false, noPageTopStatus: false, filterInsideTracePanel: false, traceStatusVisible: false, markAllReadVisible: false };
|
||||
let codeQueueGlobalStatus: any = { activeMicroserviceVisible: false };
|
||||
let codeQueueSidebarUpdateMetrics: any = { cardCount: 0, labels: [], hasRecentUpdateLabel: false };
|
||||
@@ -2100,6 +2115,7 @@ async function frontendCheck(config: UniDeskConfig, urls: PublicUrls, checks: E2
|
||||
const prompt = document.querySelector('[data-testid="code-queue-task-form"] textarea') as HTMLTextAreaElement | null;
|
||||
const provider = document.querySelector('[data-testid="codex-provider-select"]') as HTMLSelectElement | null;
|
||||
const cwd = document.querySelector('[data-testid="codex-cwd-input"]') as HTMLInputElement | null;
|
||||
const cwdSelect = document.querySelector('[data-testid="codex-cwd-select"]') as HTMLSelectElement | null;
|
||||
const maxAttempts = document.querySelector('[data-testid="codex-max-attempts-input"]') as HTMLInputElement | null;
|
||||
const moveSelect = document.querySelector('[data-testid="codex-task-queue-move-select"]') as HTMLSelectElement | null;
|
||||
const moveButton = document.querySelector('[data-testid="codex-task-queue-move-button"]') as HTMLButtonElement | null;
|
||||
@@ -2116,6 +2132,8 @@ async function frontendCheck(config: UniDeskConfig, urls: PublicUrls, checks: E2
|
||||
providerValue: provider?.value || "",
|
||||
providerOptions: Array.from(provider?.options || []).map((option) => ({ value: option.value, text: option.textContent || "" })),
|
||||
cwdValue: cwd?.value || "",
|
||||
workdirSelectValue: cwdSelect?.value || "",
|
||||
workdirSelectOptionTexts: Array.from(cwdSelect?.options || []).map((option) => option.textContent || ""),
|
||||
maxAttemptsMax: maxAttempts?.max || "",
|
||||
maxAttemptsValue: maxAttempts?.value || "",
|
||||
moveQueueVisible: Boolean(moveSelect && moveSelect.offsetParent !== null && moveButton && moveButton.offsetParent !== null),
|
||||
@@ -2569,6 +2587,7 @@ async function frontendCheck(config: UniDeskConfig, urls: PublicUrls, checks: E2
|
||||
const prompt = document.querySelector('[data-testid="code-queue-task-form"] textarea') as HTMLTextAreaElement | null;
|
||||
const provider = document.querySelector('[data-testid="codex-provider-select"]') as HTMLSelectElement | null;
|
||||
const cwd = document.querySelector('[data-testid="codex-cwd-input"]') as HTMLInputElement | null;
|
||||
const cwdSelect = document.querySelector('[data-testid="codex-cwd-select"]') as HTMLSelectElement | null;
|
||||
const maxAttempts = document.querySelector('[data-testid="codex-max-attempts-input"]') as HTMLInputElement | null;
|
||||
const moveSelect = document.querySelector('[data-testid="codex-task-queue-move-select"]') as HTMLSelectElement | null;
|
||||
const moveButton = document.querySelector('[data-testid="codex-task-queue-move-button"]') as HTMLButtonElement | null;
|
||||
@@ -2585,6 +2604,8 @@ async function frontendCheck(config: UniDeskConfig, urls: PublicUrls, checks: E2
|
||||
providerValue: provider?.value || "",
|
||||
providerOptions: Array.from(provider?.options || []).map((option) => ({ value: option.value, text: option.textContent || "" })),
|
||||
cwdValue: cwd?.value || "",
|
||||
workdirSelectValue: cwdSelect?.value || "",
|
||||
workdirSelectOptionTexts: Array.from(cwdSelect?.options || []).map((option) => option.textContent || ""),
|
||||
maxAttemptsMax: maxAttempts?.max || "",
|
||||
maxAttemptsValue: maxAttempts?.value || "",
|
||||
moveQueueVisible: Boolean(moveSelect && moveSelect.offsetParent !== null && moveButton && moveButton.offsetParent !== null),
|
||||
@@ -3184,6 +3205,14 @@ async function frontendCheck(config: UniDeskConfig, urls: PublicUrls, checks: E2
|
||||
&& !decisionCenterText.includes("{\n"),
|
||||
{ decisionCenterMetrics, decisionCenterE2eRecord, decisionCenterDeleteResult, decisionCenterTextPreview: decisionCenterText.slice(0, 1400) });
|
||||
addSelectedCheck(checks, options, "frontend:code-queue-integrated-visible", codeQueueTextLower.includes("code queue") && codeQueueText.includes("gpt-5.4-mini") && codeQueueText.includes("gpt-5.4") && codeQueueText.includes("gpt-5.5") && codeQueueText.includes("提交任务") && codeQueueText.includes("执行 Provider") && codeQueueText.includes("入队份数") && codeQueueText.includes("追加 prompt") && codeQueueText.includes("打断") && codeQueueTextLower.includes("查看 queue") && codeQueueText.includes("创建 queue") && codeQueueText.includes("合并 queue") && codeQueueOptions.some((text) => text.includes("All queues")) && codeQueueTracePlacement.firstChildIsTrace === true && codeQueueTracePlacement.noPageTopStatus === true && codeQueueTracePlacement.filterInsideTracePanel === true && codeQueueTracePlacement.taskSearchVisible === true && codeQueueTracePlacement.traceStatusVisible === true && codeQueueTracePlacement.markAllReadVisible === true && codeQueueGlobalStatus.activeMicroserviceVisible === true && codeQueueSidebarUpdateMetrics.hasRecentUpdateLabel === true && codeQueueHtmlGuard.rootAttrMissing === true && codeQueueHtmlGuard.sourceAttrMissing === true && codeQueueHtmlGuard.sourceNoBasePrompt === true && codeQueueSubmitQueueControl.tagName === "select" && codeQueueSubmitQueueControl.createButtonVisible === true && codeQueueSubmitQueueControl.mergeButtonVisible === true && codeQueueSubmitQueueControl.mergeSourceInlineMissing === true && codeQueueSubmitQueueControl.mergeDialogMissingBeforeClick === true && (codeQueueSubmitQueueControl.mergeButtonDisabled === true || (codeQueueSubmitQueueControl.mergeDialogVisible === true && codeQueueSubmitQueueControl.mergeDialogSelectVisible === true && Number(codeQueueSubmitQueueControl.mergeDialogSourceOptionCount || 0) > 1 && codeQueueSubmitQueueControl.mergeDialogSelectInsideSubmitForm !== true && codeQueueSubmitQueueControl.mergeDialogUsesCommonComponent === true && codeQueueSubmitQueueControl.mergeDialogDeleteNoteVisible === true)) && codeQueueSubmitQueueControl.oldInputMissing === true && codeQueueSubmitQueueControl.providerValue === "D601" && codeQueueSubmitQueueControl.cwdValue === "/workspace" && Array.isArray(codeQueueSubmitQueueControl.providerOptions) && codeQueueSubmitQueueControl.providerOptions.some((item: any) => item.value === "D601" && String(item.text || "").includes("/workspace")) && codeQueueSubmitQueueControl.maxAttemptsMax === "99" && codeQueueSubmitQueueControl.maxAttemptsValue === "99" && codeQueueSubmitQueueControl.moveQueueVisible === true && codeQueuePromptDefaultEmpty === true && codeQueueSubmitGuard.batchRowVisible === true && codeQueueSubmitGuard.checkboxVisible === true && codeQueueSubmitGuard.disabledBeforeConfirm === true && codeQueueSubmitGuard.enabledAfterConfirm === true && codeQueueSubmitGuard.waitElementMissingBeforeSubmit === true && codeQueueScrollbarMetrics.transcriptThin === true && codeQueueScrollbarMetrics.toolHorizontalHidden === true && (codeQueueSwitchMetrics.optionCount <= 1 || codeQueueSwitchMetrics.switched === true) && codeQueueTextLower.includes("attempts") && codeQueueText.includes("仅 UniDesk frontend 代理访问") && (codeQueueTaskCount === 0 || codeQueueOutputText.includes("Submitted prompt")), { codeQueueTaskCount, codeQueueOptions, codeQueueSwitchMetrics, codeQueueSubmitQueueControl, codeQueueSubmitGuard, codeQueueScrollbarMetrics, codeQueuePromptDefaultEmpty, codeQueueTracePlacement, codeQueueGlobalStatus, codeQueueSidebarUpdateMetrics, codeQueueHtmlGuard, codeQueueOutputPreview: codeQueueOutputText.slice(0, 900), codeQueueTextPreview: codeQueueText.slice(0, 1400) });
|
||||
addSelectedCheck(checks, options, "frontend:code-queue-workdirs-loaded",
|
||||
codeQueueSubmitQueueControl.cwdValue === "/workspace"
|
||||
&& codeQueueSubmitQueueControl.workdirSelectValue === "/workspace"
|
||||
&& Array.isArray(codeQueueSubmitQueueControl.workdirSelectOptionTexts)
|
||||
&& codeQueueSubmitQueueControl.workdirSelectOptionTexts.some((text: string) => text.includes("/workspace"))
|
||||
&& !codeQueueText.includes("加载工作目录失败")
|
||||
&& !codeQueueText.includes("HTTP 404"),
|
||||
{ codeQueueSubmitQueueControl, codeQueueTextPreview: codeQueueText.slice(0, 1200) });
|
||||
addSelectedCheck(checks, options, "frontend:code-queue-enqueue-await-smoke",
|
||||
codeQueueEnqueueAwaitSmoke.checked === true
|
||||
&& codeQueueEnqueueAwaitSmoke.delayedPostCount === 1
|
||||
|
||||
@@ -395,6 +395,7 @@ function codeQueueK3sServiceIdForRequest(method: string, targetPath: string): st
|
||||
function codeQueueMasterControlPath(method: string, targetPath: string): boolean {
|
||||
const normalizedMethod = method.toUpperCase();
|
||||
if (targetPath === "/" || targetPath === "/health" || targetPath === "/live" || targetPath === "/logs") return true;
|
||||
if (targetPath === "/api/workdirs" || /^\/api\/workdirs\/[^/]+\/[^/]+\/.+$/u.test(targetPath)) return true;
|
||||
if (targetPath === "/api/queue-claim-move/self-test") return true;
|
||||
if (targetPath === "/api/queues" || targetPath === "/api/queues/merge") return true;
|
||||
if (/^\/api\/queues\/[^/]+(?:\/merge)?$/u.test(targetPath)) return true;
|
||||
@@ -404,6 +405,38 @@ function codeQueueMasterControlPath(method: string, targetPath: string): boolean
|
||||
return false;
|
||||
}
|
||||
|
||||
function codeQueueCompatWorkdirsResponse(service: MicroserviceConfig, status: number, contentType: string, bodyText: string): Response | null {
|
||||
if (status !== 404) return null;
|
||||
let body: any = null;
|
||||
try {
|
||||
body = JSON.parse(bodyText);
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
if (body?.path !== "/api/workdirs") return null;
|
||||
const providerId = "D601";
|
||||
return jsonResponse({
|
||||
ok: true,
|
||||
workdirs: [{
|
||||
providerId,
|
||||
executionMode: "default",
|
||||
path: "/workspace",
|
||||
createdAt: null,
|
||||
updatedAt: null,
|
||||
source: "backend-core-compat",
|
||||
}],
|
||||
defaultProviderId: providerId,
|
||||
defaultWorkdir: "/workspace",
|
||||
remoteDefaultWorkdir: "/home/ubuntu",
|
||||
source: "backend-core-compat-code-queue-workdirs",
|
||||
compat: {
|
||||
reason: "code-queue-mgr version does not yet expose /api/workdirs",
|
||||
upstreamStatus: status,
|
||||
upstreamContentType: contentType,
|
||||
},
|
||||
}, 200);
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Cache helpers
|
||||
// ---------------------------------------------------------------------------
|
||||
@@ -1026,7 +1059,24 @@ async function fetchMicroserviceUpstreamResponse(
|
||||
): Promise<Response> {
|
||||
if (service.id === "code-queue" && codeQueueMasterControlPath(method, targetPath)) {
|
||||
const mgr = microserviceById("code-queue-mgr");
|
||||
if (mgr !== null) return directMicroserviceResponse(mgr, method, targetPath, proxyOptions, requestHeaders, bodyText, abortSignal);
|
||||
if (mgr !== null) {
|
||||
const response = await directMicroserviceResponse(mgr, method, targetPath, proxyOptions, requestHeaders, bodyText, abortSignal);
|
||||
if (method.toUpperCase() === "GET" && targetPath === "/api/workdirs") {
|
||||
const contentType = response.headers.get("content-type") ?? "text/plain; charset=utf-8";
|
||||
const responseBody = await response.text();
|
||||
const compat = codeQueueCompatWorkdirsResponse(service, response.status, contentType, responseBody);
|
||||
if (compat !== null) return compat;
|
||||
return new Response(responseBody, {
|
||||
status: response.status,
|
||||
headers: {
|
||||
"content-type": contentType,
|
||||
"x-unidesk-proxy-mode": response.headers.get("x-unidesk-proxy-mode") ?? "direct",
|
||||
"x-unidesk-response-truncated": response.headers.get("x-unidesk-response-truncated") ?? "false",
|
||||
},
|
||||
});
|
||||
}
|
||||
return response;
|
||||
}
|
||||
logger("warn", "code_queue_mgr_missing_fallback_to_d601", { method, targetPath });
|
||||
}
|
||||
if (isK3sctlManagedMicroservice(service)) {
|
||||
|
||||
@@ -154,6 +154,47 @@ fn code_queue_scheduler_only_path(method: &Method, target_path: &str) -> bool {
|
||||
&& !(target_path == "/" || target_path == "/health")
|
||||
}
|
||||
|
||||
fn code_queue_fallback_workdirs_response(
|
||||
service: &MicroserviceConfig,
|
||||
status: u16,
|
||||
content_type: &str,
|
||||
body_text: &str,
|
||||
) -> Option<Response> {
|
||||
if status != 404 {
|
||||
return None;
|
||||
}
|
||||
let body = serde_json::from_str::<Value>(body_text).ok()?;
|
||||
if body.get("path").and_then(Value::as_str) != Some("/api/workdirs") {
|
||||
return None;
|
||||
}
|
||||
let provider_id = service.provider_id.clone();
|
||||
let fallback = json!({
|
||||
"ok": true,
|
||||
"workdirs": [{
|
||||
"providerId": provider_id,
|
||||
"executionMode": "default",
|
||||
"path": "/workspace",
|
||||
"createdAt": null,
|
||||
"updatedAt": null,
|
||||
"source": "backend-core-compat"
|
||||
}],
|
||||
"defaultProviderId": provider_id,
|
||||
"defaultWorkdir": "/workspace",
|
||||
"remoteDefaultWorkdir": "/home/ubuntu",
|
||||
"source": "backend-core-compat-code-queue-workdirs",
|
||||
"compat": {
|
||||
"reason": "code-queue-mgr version does not yet expose /api/workdirs",
|
||||
"upstreamStatus": status,
|
||||
"upstreamContentType": content_type
|
||||
}
|
||||
});
|
||||
Some(response_with_body(
|
||||
200,
|
||||
"application/json; charset=utf-8",
|
||||
Body::from(serde_json::to_string(&fallback).unwrap_or_else(|_| "{\"ok\":true,\"workdirs\":[]}".to_string())),
|
||||
))
|
||||
}
|
||||
|
||||
fn path_matches_task_detail(path: &str) -> bool {
|
||||
path.strip_prefix("/api/tasks/")
|
||||
.is_some_and(|rest| !rest.is_empty() && !rest.contains('/'))
|
||||
@@ -926,7 +967,7 @@ async fn fetch_microservice_upstream_response(
|
||||
)
|
||||
.await;
|
||||
}
|
||||
return direct_microservice_response(
|
||||
let response = direct_microservice_response(
|
||||
&direct_code_queue_mgr_service(service, None),
|
||||
method,
|
||||
target_path,
|
||||
@@ -935,6 +976,18 @@ async fn fetch_microservice_upstream_response(
|
||||
body_text,
|
||||
)
|
||||
.await;
|
||||
if method == Method::GET && target_path == "/api/workdirs" {
|
||||
let (status, content_type, response_body) = response_to_text(response).await.unwrap_or((
|
||||
502,
|
||||
"text/plain".to_string(),
|
||||
"failed to read code-queue workdirs response".to_string(),
|
||||
));
|
||||
if let Some(fallback) = code_queue_fallback_workdirs_response(service, status, &content_type, &response_body) {
|
||||
return fallback;
|
||||
}
|
||||
return response_with_body(status, &content_type, Body::from(response_body));
|
||||
}
|
||||
return response;
|
||||
}
|
||||
if is_k3sctl_managed_microservice(service) {
|
||||
return k3sctl_adapter_microservice_response(
|
||||
|
||||
@@ -18,6 +18,7 @@ const MAX_BODY_BYTES: usize = 512 * 1024;
|
||||
const MAX_LIST_LIMIT: i64 = 200;
|
||||
const MAX_OUTPUT_PAGE_LIMIT: usize = 500;
|
||||
const MAX_PREVIEW_CHARS: usize = 6000;
|
||||
const WORKDIR_MAX_CHARS: usize = 512;
|
||||
|
||||
#[derive(Clone)]
|
||||
struct Config {
|
||||
@@ -347,6 +348,14 @@ fn ensure_schema(state: &AppState) -> Result<(), String> {
|
||||
created_at TIMESTAMPTZ NOT NULL,
|
||||
updated_at TIMESTAMPTZ NOT NULL
|
||||
);
|
||||
CREATE TABLE IF NOT EXISTS unidesk_code_queue_workdirs (
|
||||
provider_id TEXT NOT NULL,
|
||||
execution_mode TEXT NOT NULL DEFAULT 'default',
|
||||
path TEXT NOT NULL,
|
||||
created_at TIMESTAMPTZ NOT NULL,
|
||||
updated_at TIMESTAMPTZ NOT NULL,
|
||||
PRIMARY KEY (provider_id, execution_mode, path)
|
||||
);
|
||||
ALTER TABLE unidesk_code_queue_tasks ADD COLUMN IF NOT EXISTS queue_id TEXT NOT NULL DEFAULT 'default';
|
||||
ALTER TABLE unidesk_code_queue_tasks ADD COLUMN IF NOT EXISTS provider_id TEXT NOT NULL DEFAULT 'main-server';
|
||||
ALTER TABLE unidesk_code_queue_tasks ADD COLUMN IF NOT EXISTS execution_mode TEXT NOT NULL DEFAULT 'default';
|
||||
@@ -355,6 +364,7 @@ fn ensure_schema(state: &AppState) -> Result<(), String> {
|
||||
ALTER TABLE unidesk_code_queue_tasks ADD COLUMN IF NOT EXISTS reference_injection JSONB;
|
||||
ALTER TABLE unidesk_code_queue_tasks ADD COLUMN IF NOT EXISTS read_at TIMESTAMPTZ;
|
||||
ALTER TABLE unidesk_code_queue_queues ADD COLUMN IF NOT EXISTS name TEXT NOT NULL DEFAULT '';
|
||||
ALTER TABLE unidesk_code_queue_workdirs ADD COLUMN IF NOT EXISTS execution_mode TEXT NOT NULL DEFAULT 'default';
|
||||
CREATE INDEX IF NOT EXISTS idx_code_queue_tasks_list ON unidesk_code_queue_tasks (status, updated_at DESC, id DESC);
|
||||
CREATE INDEX IF NOT EXISTS idx_code_queue_tasks_queue_updated ON unidesk_code_queue_tasks (queue_id, updated_at DESC, id DESC);
|
||||
",
|
||||
@@ -370,6 +380,7 @@ fn ensure_schema(state: &AppState) -> Result<(), String> {
|
||||
&[&DEFAULT_QUEUE_ID.to_string(), &DEFAULT_QUEUE_ID.to_string(), &now],
|
||||
)
|
||||
.map_err(|error| error_detail(&error))?;
|
||||
upsert_default_workdir(&mut client, state, &now)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -1093,6 +1104,185 @@ fn update_queue(state: &AppState, queue_id_value: &str, request: &mut Request) -
|
||||
))
|
||||
}
|
||||
|
||||
fn default_workdir_record(state: &AppState, timestamp: &str) -> Result<Value, String> {
|
||||
let provider_id = normalize_workdir_provider_id(state, &state.config.default_provider_id);
|
||||
let path = normalize_workdir_path(state, &provider_id, &state.config.default_workdir)?;
|
||||
Ok(json!({
|
||||
"providerId": provider_id,
|
||||
"executionMode": "default",
|
||||
"path": path,
|
||||
"createdAt": timestamp,
|
||||
"updatedAt": timestamp
|
||||
}))
|
||||
}
|
||||
|
||||
fn workdir_key(record: &Value) -> String {
|
||||
format!(
|
||||
"{}\u{0}{}\u{0}{}",
|
||||
record.get("providerId").and_then(Value::as_str).unwrap_or(""),
|
||||
record.get("executionMode").and_then(Value::as_str).unwrap_or("default"),
|
||||
record.get("path").and_then(Value::as_str).unwrap_or("")
|
||||
)
|
||||
}
|
||||
|
||||
fn sorted_workdir_records(records: Vec<Value>) -> Vec<Value> {
|
||||
let mut keyed = std::collections::BTreeMap::<String, Value>::new();
|
||||
for record in records {
|
||||
keyed.insert(workdir_key(&record), record);
|
||||
}
|
||||
keyed.into_values().collect()
|
||||
}
|
||||
|
||||
fn upsert_default_workdir(client: &mut Client, state: &AppState, timestamp: &str) -> Result<(), String> {
|
||||
let record = default_workdir_record(state, timestamp)?;
|
||||
let provider_id = record.get("providerId").and_then(Value::as_str).unwrap_or("").to_string();
|
||||
let execution_mode = record.get("executionMode").and_then(Value::as_str).unwrap_or("default").to_string();
|
||||
let path = record.get("path").and_then(Value::as_str).unwrap_or("").to_string();
|
||||
client
|
||||
.execute(
|
||||
"
|
||||
INSERT INTO unidesk_code_queue_workdirs (provider_id, execution_mode, path, created_at, updated_at)
|
||||
VALUES ($1, $2, $3, $4::text::timestamptz, $4::text::timestamptz)
|
||||
ON CONFLICT (provider_id, execution_mode, path) DO UPDATE SET updated_at = EXCLUDED.updated_at
|
||||
",
|
||||
&[&provider_id, &execution_mode, &path, ×tamp],
|
||||
)
|
||||
.map_err(|error| error_detail(&error))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn load_workdirs(state: &AppState) -> Result<Vec<Value>, String> {
|
||||
let mut client = db_client(state, "unidesk-code-queue-mgr-rust-workdirs")?;
|
||||
let rows = client
|
||||
.query(
|
||||
"
|
||||
SELECT provider_id, execution_mode, path, created_at::text, updated_at::text
|
||||
FROM unidesk_code_queue_workdirs
|
||||
ORDER BY provider_id ASC, execution_mode ASC, path ASC
|
||||
",
|
||||
&[],
|
||||
)
|
||||
.map_err(|error| error_detail(&error))?;
|
||||
let mut records = Vec::new();
|
||||
if let Ok(record) = default_workdir_record(state, &now_iso()) {
|
||||
records.push(record);
|
||||
}
|
||||
for row in rows {
|
||||
let provider_id = normalize_workdir_provider_id(state, row.get::<_, &str>(0));
|
||||
let execution_mode_raw: &str = row.get(1);
|
||||
let execution_mode = match execution_mode_raw {
|
||||
"windows-native" => "windows-native",
|
||||
_ => "default",
|
||||
};
|
||||
let path_raw: &str = row.get(2);
|
||||
let path = normalize_workdir_path(state, &provider_id, path_raw)?;
|
||||
records.push(json!({
|
||||
"providerId": provider_id,
|
||||
"executionMode": execution_mode,
|
||||
"path": path,
|
||||
"createdAt": row.get::<_, String>(3),
|
||||
"updatedAt": row.get::<_, String>(4)
|
||||
}));
|
||||
}
|
||||
Ok(sorted_workdir_records(records))
|
||||
}
|
||||
|
||||
fn list_workdirs(state: &AppState, url: &RequestUrl) -> Result<(Value, u16), (Value, u16)> {
|
||||
let provider_filter = query_param(url, "providerId")
|
||||
.filter(|value| !value.trim().is_empty())
|
||||
.map(|value| normalize_workdir_provider_id(state, &value));
|
||||
let mode_filter = query_param(url, "executionMode")
|
||||
.filter(|value| !value.trim().is_empty())
|
||||
.map(|value| normalize_execution_mode(Some(&json!(value))));
|
||||
let records = load_workdirs(state).map_err(|error| (json!({ "ok": false, "error": error }), 500))?;
|
||||
let workdirs = records
|
||||
.into_iter()
|
||||
.filter(|record| {
|
||||
if let Some(provider_id) = &provider_filter {
|
||||
if record.get("providerId").and_then(Value::as_str) != Some(provider_id.as_str()) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
if let Some(mode) = &mode_filter {
|
||||
if record.get("executionMode").and_then(Value::as_str) != Some(mode.as_str()) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
true
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
Ok((json!({
|
||||
"ok": true,
|
||||
"workdirs": workdirs,
|
||||
"defaultProviderId": state.config.default_provider_id,
|
||||
"defaultWorkdir": state.config.default_workdir,
|
||||
"remoteDefaultWorkdir": state.config.remote_default_workdir,
|
||||
"source": "code-queue-mgr-rust-postgres"
|
||||
}), 200))
|
||||
}
|
||||
|
||||
fn create_workdir(state: &AppState, request: &mut Request) -> Result<(Value, u16), (Value, u16)> {
|
||||
let body = read_json_body(request)?;
|
||||
let provider_id = normalize_workdir_provider_id(state, body.get("providerId").and_then(Value::as_str).unwrap_or(""));
|
||||
let execution_mode = normalize_execution_mode(body.get("executionMode"));
|
||||
let raw_path = body
|
||||
.get("path")
|
||||
.or_else(|| body.get("cwd"))
|
||||
.or_else(|| body.get("workdir"))
|
||||
.and_then(Value::as_str)
|
||||
.unwrap_or("");
|
||||
let path = normalize_workdir_path(state, &provider_id, raw_path).map_err(|error| (json!({ "ok": false, "error": error }), 400))?;
|
||||
let now = now_iso();
|
||||
let mut client = db_client(state, "unidesk-code-queue-mgr-rust-workdir-create").map_err(|error| (json!({ "ok": false, "error": error }), 500))?;
|
||||
client
|
||||
.execute(
|
||||
"
|
||||
INSERT INTO unidesk_code_queue_workdirs (provider_id, execution_mode, path, created_at, updated_at)
|
||||
VALUES ($1, $2, $3, $4::text::timestamptz, $4::text::timestamptz)
|
||||
ON CONFLICT (provider_id, execution_mode, path) DO UPDATE SET updated_at = EXCLUDED.updated_at
|
||||
",
|
||||
&[&provider_id, &execution_mode, &path, &now],
|
||||
)
|
||||
.map_err(|error| (json!({ "ok": false, "error": error_detail(&error) }), 500))?;
|
||||
let workdir = json!({ "providerId": provider_id, "executionMode": execution_mode, "path": path, "createdAt": now, "updatedAt": now });
|
||||
let workdirs = load_workdirs(state).map_err(|error| (json!({ "ok": false, "error": error }), 500))?;
|
||||
Ok((json!({
|
||||
"ok": true,
|
||||
"workdir": workdir,
|
||||
"workdirs": workdirs,
|
||||
"ensure": { "ok": true, "skipped": true, "reason": "code-queue-mgr records workdir menu options only; provider directories are created by the execution plane" }
|
||||
}), 201))
|
||||
}
|
||||
|
||||
fn delete_workdir(state: &AppState, provider_id_value: &str, execution_mode_value: &str, path_value: &str) -> Result<(Value, u16), (Value, u16)> {
|
||||
let provider_id = normalize_workdir_provider_id(state, provider_id_value);
|
||||
let execution_mode = normalize_execution_mode(Some(&json!(execution_mode_value)));
|
||||
let path = normalize_workdir_path(state, &provider_id, path_value).map_err(|error| (json!({ "ok": false, "error": error }), 400))?;
|
||||
let mut client = db_client(state, "unidesk-code-queue-mgr-rust-workdir-delete").map_err(|error| (json!({ "ok": false, "error": error }), 500))?;
|
||||
let rows = client
|
||||
.query(
|
||||
"
|
||||
DELETE FROM unidesk_code_queue_workdirs
|
||||
WHERE provider_id = $1 AND execution_mode = $2 AND path = $3
|
||||
RETURNING provider_id, execution_mode, path, created_at::text, updated_at::text
|
||||
",
|
||||
&[&provider_id, &execution_mode, &path],
|
||||
)
|
||||
.map_err(|error| (json!({ "ok": false, "error": error_detail(&error) }), 500))?;
|
||||
let Some(row) = rows.first() else {
|
||||
return Ok((json!({ "ok": false, "error": "workdir not found" }), 404));
|
||||
};
|
||||
let deleted = json!({
|
||||
"providerId": row.get::<_, String>(0),
|
||||
"executionMode": row.get::<_, String>(1),
|
||||
"path": row.get::<_, String>(2),
|
||||
"createdAt": row.get::<_, String>(3),
|
||||
"updatedAt": row.get::<_, String>(4)
|
||||
});
|
||||
let workdirs = load_workdirs(state).map_err(|error| (json!({ "ok": false, "error": error }), 500))?;
|
||||
Ok((json!({ "ok": true, "deleted": deleted, "workdirs": workdirs }), 200))
|
||||
}
|
||||
|
||||
fn normalize_provider_id(state: &AppState, value: Option<&Value>) -> String {
|
||||
value
|
||||
.and_then(Value::as_str)
|
||||
@@ -1102,6 +1292,20 @@ fn normalize_provider_id(state: &AppState, value: Option<&Value>) -> String {
|
||||
.to_string()
|
||||
}
|
||||
|
||||
fn normalize_workdir_provider_id(state: &AppState, value: &str) -> String {
|
||||
let text = value.trim();
|
||||
if !text.is_empty()
|
||||
&& text.len() <= 64
|
||||
&& text
|
||||
.chars()
|
||||
.all(|ch| ch.is_ascii_alphanumeric() || matches!(ch, '_' | '.' | '-'))
|
||||
{
|
||||
text.to_string()
|
||||
} else {
|
||||
state.config.default_provider_id.clone()
|
||||
}
|
||||
}
|
||||
|
||||
fn normalize_cwd(state: &AppState, provider_id: &str, value: Option<&Value>) -> String {
|
||||
value
|
||||
.and_then(Value::as_str)
|
||||
@@ -1111,6 +1315,34 @@ fn normalize_cwd(state: &AppState, provider_id: &str, value: Option<&Value>) ->
|
||||
.unwrap_or_else(|| if provider_id == "main-server" { state.config.default_workdir.clone() } else { state.config.remote_default_workdir.clone() })
|
||||
}
|
||||
|
||||
fn default_workdir_for_provider(state: &AppState, provider_id: &str) -> String {
|
||||
if provider_id == state.config.default_provider_id {
|
||||
state.config.default_workdir.clone()
|
||||
} else {
|
||||
state.config.remote_default_workdir.clone()
|
||||
}
|
||||
}
|
||||
|
||||
fn normalize_workdir_path(state: &AppState, provider_id: &str, value: &str) -> Result<String, String> {
|
||||
let raw = value.trim();
|
||||
if raw.is_empty() {
|
||||
return Err("workdir path is required".to_string());
|
||||
}
|
||||
if raw.chars().count() > WORKDIR_MAX_CHARS {
|
||||
return Err(format!("workdir path must be {WORKDIR_MAX_CHARS} characters or fewer"));
|
||||
}
|
||||
if raw.contains('\0') {
|
||||
return Err("workdir path contains an invalid character".to_string());
|
||||
}
|
||||
let path = if raw.starts_with('/') {
|
||||
raw.to_string()
|
||||
} else {
|
||||
format!("{}/{}", default_workdir_for_provider(state, provider_id).trim_end_matches('/'), raw)
|
||||
};
|
||||
let trimmed = path.trim_end_matches('/').to_string();
|
||||
Ok(if trimmed.is_empty() { "/".to_string() } else { trimmed })
|
||||
}
|
||||
|
||||
fn normalize_execution_mode(value: Option<&Value>) -> String {
|
||||
match value.and_then(Value::as_str).unwrap_or("").trim().to_ascii_lowercase().as_str() {
|
||||
"windows-native" | "windows" | "win32" | "native-windows" => "windows-native".to_string(),
|
||||
@@ -1924,7 +2156,7 @@ fn health(state: &AppState) -> (Value, u16) {
|
||||
"readPath": "column-first"
|
||||
},
|
||||
"endpoints": {
|
||||
"control": ["/api/queues", "/api/tasks", "/api/tasks/:id/(move|retry|read|edit)", "/api/queue-claim-move/self-test"],
|
||||
"control": ["/api/queues", "/api/tasks", "/api/tasks/:id/(move|retry|read|edit)", "/api/workdirs", "/api/queue-claim-move/self-test"],
|
||||
"traceRead": ["/api/tasks/overview", "/api/tasks/:id/summary", "/api/tasks/:id/trace-summary", "/api/tasks/:id/trace-steps", "/api/tasks/:id/output"]
|
||||
}
|
||||
});
|
||||
@@ -2015,6 +2247,29 @@ fn handle_request(state: &AppState, request: &mut Request) -> (Value, u16) {
|
||||
if path == "/api/queue-claim-move/self-test" && (method == Method::Get || method == Method::Post) {
|
||||
return (json!({ "ok": true, "runtime": "rust", "check": "route-present", "claimMoveSafety": "conditional update requires unclaimed queued/retry_wait task" }), 200);
|
||||
}
|
||||
if path == "/api/workdirs" && method == Method::Get {
|
||||
return list_workdirs(state, &url).unwrap_or_else(|response| response);
|
||||
}
|
||||
if path == "/api/workdirs" && method == Method::Post {
|
||||
return create_workdir(state, request).unwrap_or_else(|response| response);
|
||||
}
|
||||
if method == Method::Delete {
|
||||
if let Some(rest) = path.strip_prefix("/api/workdirs/") {
|
||||
let mut parts = rest.splitn(3, '/');
|
||||
let provider_id = parts.next().unwrap_or_default();
|
||||
let execution_mode = parts.next().unwrap_or_default();
|
||||
let path_value = parts.next().unwrap_or_default();
|
||||
if !provider_id.is_empty() && !execution_mode.is_empty() && !path_value.is_empty() {
|
||||
return delete_workdir(
|
||||
state,
|
||||
&percent_decode(provider_id),
|
||||
&percent_decode(execution_mode),
|
||||
&percent_decode(path_value),
|
||||
)
|
||||
.unwrap_or_else(|response| response);
|
||||
}
|
||||
}
|
||||
}
|
||||
if let Some(response) = route_active_control(path, &method) {
|
||||
return response;
|
||||
}
|
||||
@@ -2344,4 +2599,13 @@ mod tests {
|
||||
assert!(preview.get("text").and_then(Value::as_str).unwrap_or("").contains("...<truncated>"));
|
||||
assert!(!preview.get("text").and_then(Value::as_str).unwrap_or("").contains(marker));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn default_workdir_contract_includes_workspace() {
|
||||
let state = test_state();
|
||||
let record = default_workdir_record(&state, "2026-01-01T00:00:00.000Z").expect("default workdir");
|
||||
assert_eq!(record.get("providerId").and_then(Value::as_str), Some("D601"));
|
||||
assert_eq!(record.get("executionMode").and_then(Value::as_str), Some("default"));
|
||||
assert_eq!(record.get("path").and_then(Value::as_str), Some("/workspace"));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -40,6 +40,14 @@ interface QueueRecord {
|
||||
updatedAt: string;
|
||||
}
|
||||
|
||||
interface WorkdirRecord {
|
||||
providerId: string;
|
||||
executionMode: CodeExecutionMode;
|
||||
path: string;
|
||||
createdAt: string;
|
||||
updatedAt: string;
|
||||
}
|
||||
|
||||
interface DailyTaskStatsBucket {
|
||||
date: string;
|
||||
executedTasks: number;
|
||||
@@ -275,6 +283,7 @@ const codeQueueEnvironmentHint = [
|
||||
"如果当前 Code Queue Docker 容器缺少完成任务所需的环境、系统包或语言依赖,可以先在容器内临时安装以推进当前任务;同时必须把该依赖补到 `src/components/microservices/code-queue/Dockerfile`,让后续任务重建镜像后可直接使用。",
|
||||
].join("\n");
|
||||
const maxTaskAttempts = 99;
|
||||
const workdirMaxLength = 512;
|
||||
const defaultCodeModels = ["gpt-5.5", "gpt-5.4-mini", "gpt-5.4", "minimax-m2.7"];
|
||||
const codeExecutionModes: CodeExecutionMode[] = ["default", "windows-native"];
|
||||
const codexStatsTimeZone = "Asia/Shanghai";
|
||||
@@ -523,10 +532,140 @@ function normalizeProviderId(value: unknown): string {
|
||||
return text.length > 0 ? text : config.defaultProviderId;
|
||||
}
|
||||
|
||||
function normalizeWorkdirProviderId(value: unknown): string {
|
||||
const text = typeof value === "string" ? value.trim() : "";
|
||||
return /^[A-Za-z0-9_.-]{1,64}$/u.test(text) ? text : config.defaultProviderId;
|
||||
}
|
||||
|
||||
function normalizeCwd(providerId: string, value: unknown): string {
|
||||
const text = typeof value === "string" && value.trim().length > 0 ? value.trim() : "";
|
||||
if (text.length > 0) return text;
|
||||
return providerId === "main-server" ? config.defaultWorkdir : config.remoteDefaultWorkdir;
|
||||
return providerId === config.defaultProviderId ? config.defaultWorkdir : config.remoteDefaultWorkdir;
|
||||
}
|
||||
|
||||
function defaultWorkdirForProvider(providerId: string): string {
|
||||
return providerId === config.defaultProviderId ? config.defaultWorkdir : config.remoteDefaultWorkdir;
|
||||
}
|
||||
|
||||
function normalizeWorkdirPath(value: unknown, providerId: string): string {
|
||||
const raw = typeof value === "string" ? value.trim() : "";
|
||||
if (raw.length === 0) throw new Error("workdir path is required");
|
||||
if (raw.length > workdirMaxLength) throw new Error(`workdir path must be ${workdirMaxLength} characters or fewer`);
|
||||
if (raw.includes("\u0000")) throw new Error("workdir path contains an invalid character");
|
||||
if (raw.startsWith("/")) return raw.replace(/\/+$/u, "") || "/";
|
||||
return `${defaultWorkdirForProvider(providerId).replace(/\/+$/u, "")}/${raw}`.replace(/\/+$/u, "") || "/";
|
||||
}
|
||||
|
||||
function defaultWorkdirRecords(timestamp = nowIso()): WorkdirRecord[] {
|
||||
const providerId = normalizeWorkdirProviderId(config.defaultProviderId);
|
||||
return [{
|
||||
providerId,
|
||||
executionMode: "default",
|
||||
path: normalizeWorkdirPath(config.defaultWorkdir, providerId),
|
||||
createdAt: timestamp,
|
||||
updatedAt: timestamp,
|
||||
}];
|
||||
}
|
||||
|
||||
function workdirRecordKey(record: Pick<WorkdirRecord, "providerId" | "executionMode" | "path">): string {
|
||||
return `${record.providerId}\u0000${record.executionMode}\u0000${record.path}`;
|
||||
}
|
||||
|
||||
function mergeWorkdirRecords(records: WorkdirRecord[]): WorkdirRecord[] {
|
||||
const byKey = new Map<string, WorkdirRecord>();
|
||||
for (const record of records) {
|
||||
byKey.set(workdirRecordKey(record), record);
|
||||
}
|
||||
return Array.from(byKey.values()).sort((left, right) => {
|
||||
const providerDelta = left.providerId.localeCompare(right.providerId);
|
||||
if (providerDelta !== 0) return providerDelta;
|
||||
const modeDelta = left.executionMode.localeCompare(right.executionMode);
|
||||
if (modeDelta !== 0) return modeDelta;
|
||||
return left.path.localeCompare(right.path);
|
||||
});
|
||||
}
|
||||
|
||||
async function loadWorkdirRecords(): Promise<WorkdirRecord[]> {
|
||||
const rows = await mgrSql<Array<{ provider_id: string; execution_mode: string; path: string; created_at: Date | string; updated_at: Date | string }>>`
|
||||
SELECT provider_id, execution_mode, path, created_at, updated_at
|
||||
FROM unidesk_code_queue_workdirs
|
||||
ORDER BY provider_id ASC, execution_mode ASC, path ASC
|
||||
`;
|
||||
const records = rows.map((row) => {
|
||||
const providerId = normalizeWorkdirProviderId(row.provider_id);
|
||||
return {
|
||||
providerId,
|
||||
executionMode: normalizeExecutionMode(row.execution_mode),
|
||||
path: normalizeWorkdirPath(row.path, providerId),
|
||||
createdAt: timestampToIso(row.created_at) ?? nowIso(),
|
||||
updatedAt: timestampToIso(row.updated_at) ?? nowIso(),
|
||||
};
|
||||
});
|
||||
return mergeWorkdirRecords([...defaultWorkdirRecords(), ...records]);
|
||||
}
|
||||
|
||||
async function upsertWorkdir(record: WorkdirRecord): Promise<void> {
|
||||
await mgrSql`
|
||||
INSERT INTO unidesk_code_queue_workdirs (provider_id, execution_mode, path, created_at, updated_at)
|
||||
VALUES (${record.providerId}, ${record.executionMode}, ${record.path}, ${record.createdAt}, ${record.updatedAt})
|
||||
ON CONFLICT (provider_id, execution_mode, path) DO UPDATE SET
|
||||
updated_at = EXCLUDED.updated_at
|
||||
`;
|
||||
}
|
||||
|
||||
function filteredWorkdirRecords(records: WorkdirRecord[], url: URL): WorkdirRecord[] {
|
||||
const providerIdParam = url.searchParams.get("providerId");
|
||||
const executionModeParam = url.searchParams.get("executionMode");
|
||||
const providerId = providerIdParam === null || providerIdParam.trim().length === 0 ? null : normalizeWorkdirProviderId(providerIdParam);
|
||||
const executionMode = executionModeParam === null || executionModeParam.trim().length === 0 ? null : normalizeExecutionMode(executionModeParam);
|
||||
return records.filter((record) => {
|
||||
if (providerId !== null && record.providerId !== providerId) return false;
|
||||
if (executionMode !== null && record.executionMode !== executionMode) return false;
|
||||
return true;
|
||||
});
|
||||
}
|
||||
|
||||
async function listWorkdirs(url: URL): Promise<Response> {
|
||||
const records = await loadWorkdirRecords();
|
||||
return jsonResponse({
|
||||
ok: true,
|
||||
workdirs: filteredWorkdirRecords(records, url),
|
||||
defaultProviderId: config.defaultProviderId,
|
||||
defaultWorkdir: config.defaultWorkdir,
|
||||
remoteDefaultWorkdir: config.remoteDefaultWorkdir,
|
||||
source: "code-queue-mgr-postgres",
|
||||
});
|
||||
}
|
||||
|
||||
async function createWorkdir(req: Request): Promise<Response> {
|
||||
const body = asRecord(await readJson(req)) ?? {};
|
||||
const providerId = normalizeWorkdirProviderId(body.providerId);
|
||||
const executionMode = normalizeExecutionMode(body.executionMode);
|
||||
const path = normalizeWorkdirPath(body.path ?? body.cwd ?? body.workdir, providerId);
|
||||
const now = nowIso();
|
||||
const workdir: WorkdirRecord = { providerId, executionMode, path, createdAt: now, updatedAt: now };
|
||||
await upsertWorkdir(workdir);
|
||||
return jsonResponse({
|
||||
ok: true,
|
||||
workdir,
|
||||
workdirs: await loadWorkdirRecords(),
|
||||
ensure: { ok: true, skipped: true, reason: "code-queue-mgr records workdir menu options only; provider directories are created by the execution plane" },
|
||||
}, 201);
|
||||
}
|
||||
|
||||
async function deleteWorkdir(providerIdValue: string, executionModeValue: string, pathValue: string): Promise<Response> {
|
||||
const providerId = normalizeWorkdirProviderId(providerIdValue);
|
||||
const executionMode = normalizeExecutionMode(executionModeValue);
|
||||
const path = normalizeWorkdirPath(pathValue, providerId);
|
||||
const rows = await mgrSql<WorkdirRecord[]>`
|
||||
DELETE FROM unidesk_code_queue_workdirs
|
||||
WHERE provider_id = ${providerId}
|
||||
AND execution_mode = ${executionMode}
|
||||
AND path = ${path}
|
||||
RETURNING provider_id AS "providerId", execution_mode AS "executionMode", path, created_at AS "createdAt", updated_at AS "updatedAt"
|
||||
`;
|
||||
if (rows.length === 0) return jsonResponse({ ok: false, error: "workdir not found" }, 404);
|
||||
return jsonResponse({ ok: true, deleted: rows[0], workdirs: await loadWorkdirRecords() });
|
||||
}
|
||||
|
||||
function normalizeStringArray(value: unknown): string[] {
|
||||
@@ -1145,6 +1284,16 @@ async function ensureSchema(): Promise<void> {
|
||||
updated_at TIMESTAMPTZ NOT NULL
|
||||
)
|
||||
`;
|
||||
await mgrSql`
|
||||
CREATE TABLE IF NOT EXISTS unidesk_code_queue_workdirs (
|
||||
provider_id TEXT NOT NULL,
|
||||
execution_mode TEXT NOT NULL DEFAULT 'default',
|
||||
path TEXT NOT NULL,
|
||||
created_at TIMESTAMPTZ NOT NULL,
|
||||
updated_at TIMESTAMPTZ NOT NULL,
|
||||
PRIMARY KEY (provider_id, execution_mode, path)
|
||||
)
|
||||
`;
|
||||
await mgrSql`ALTER TABLE unidesk_code_queue_tasks ADD COLUMN IF NOT EXISTS queue_id TEXT NOT NULL DEFAULT 'default'`;
|
||||
await mgrSql`ALTER TABLE unidesk_code_queue_tasks ADD COLUMN IF NOT EXISTS provider_id TEXT NOT NULL DEFAULT 'main-server'`;
|
||||
await mgrSql`ALTER TABLE unidesk_code_queue_tasks ADD COLUMN IF NOT EXISTS execution_mode TEXT NOT NULL DEFAULT 'default'`;
|
||||
@@ -1153,8 +1302,10 @@ async function ensureSchema(): Promise<void> {
|
||||
await mgrSql`ALTER TABLE unidesk_code_queue_tasks ADD COLUMN IF NOT EXISTS reference_injection JSONB`;
|
||||
await mgrSql`ALTER TABLE unidesk_code_queue_tasks ADD COLUMN IF NOT EXISTS read_at TIMESTAMPTZ`;
|
||||
await mgrSql`ALTER TABLE unidesk_code_queue_queues ADD COLUMN IF NOT EXISTS name TEXT NOT NULL DEFAULT ''`;
|
||||
await mgrSql`ALTER TABLE unidesk_code_queue_workdirs ADD COLUMN IF NOT EXISTS execution_mode TEXT NOT NULL DEFAULT 'default'`;
|
||||
const now = nowIso();
|
||||
await upsertQueue({ id: config.defaultQueueId, name: config.defaultQueueId, createdAt: now, updatedAt: now });
|
||||
await Promise.all(defaultWorkdirRecords(now).map((record) => upsertWorkdir(record)));
|
||||
schemaReady = true;
|
||||
schemaLastError = null;
|
||||
}
|
||||
@@ -2639,7 +2790,7 @@ async function route(req: Request): Promise<Response> {
|
||||
noDockerSocket: true,
|
||||
},
|
||||
endpoints: {
|
||||
control: ["/api/queues", "/api/tasks", "/api/tasks/:id/(move|retry|read|edit)", "/api/queue-claim-move/self-test"],
|
||||
control: ["/api/queues", "/api/tasks", "/api/tasks/:id/(move|retry|read|edit)", "/api/workdirs", "/api/queue-claim-move/self-test"],
|
||||
traceRead: ["/api/tasks/overview", "/api/tasks/:id/summary", "/api/tasks/:id/trace-summary", "/api/tasks/:id/trace-steps", "/api/tasks/:id/output"],
|
||||
},
|
||||
}, schemaReady ? 200 : 503);
|
||||
@@ -2649,6 +2800,10 @@ async function route(req: Request): Promise<Response> {
|
||||
if (!schemaReady) return jsonResponse({ ok: false, error: "code-queue-mgr database schema is not ready", schemaLastError }, 503);
|
||||
return jsonResponse(await runQueueClaimMoveSelfTest(), 200);
|
||||
}
|
||||
if (url.pathname === "/api/workdirs" && req.method === "GET") return await listWorkdirs(url);
|
||||
if (url.pathname === "/api/workdirs" && req.method === "POST") return await createWorkdir(req);
|
||||
const workdirMatch = url.pathname.match(/^\/api\/workdirs\/([^/]+)\/([^/]+)\/(.+)$/u);
|
||||
if (workdirMatch !== null && req.method === "DELETE") return await deleteWorkdir(decodeURIComponent(workdirMatch[1] ?? ""), decodeURIComponent(workdirMatch[2] ?? ""), decodeURIComponent(workdirMatch[3] ?? ""));
|
||||
const activeControl = routeActiveControl(url.pathname, req.method);
|
||||
if (activeControl !== null) return jsonResponse(activeControl.body, activeControl.status);
|
||||
if (url.pathname === "/api/queues" && req.method === "GET") {
|
||||
|
||||
@@ -255,6 +255,56 @@ spec:
|
||||
bun install --frozen-lockfile
|
||||
(cd src && bun install --frozen-lockfile)
|
||||
UNIDESK_D601_RUST_CHECK=1 bun scripts/cli.ts check --full --rust
|
||||
cargo build --manifest-path src/components/microservices/code-queue-mgr/Cargo.toml --release
|
||||
CQ_MGR_ID="$(printf '%s' "$(params.revision)" | tr -cd 'A-Za-z0-9_.-' | cut -c1-48)"
|
||||
CQ_MGR_DB="$(workspaces.source.path)/code-queue-mgr-workdirs-postgres"
|
||||
rm -rf "$CQ_MGR_DB"
|
||||
mkdir -p "$CQ_MGR_DB"
|
||||
docker rm -f "code-queue-mgr-workdirs-$CQ_MGR_ID" >/dev/null 2>&1 || true
|
||||
docker run -d --name "code-queue-mgr-workdirs-$CQ_MGR_ID" \
|
||||
-p 127.0.0.1::5432 \
|
||||
-e POSTGRES_USER=unidesk_ci \
|
||||
-e POSTGRES_PASSWORD=unidesk_ci_password \
|
||||
-e POSTGRES_DB=unidesk_ci \
|
||||
-v "$CQ_MGR_DB:/var/lib/postgresql/data" \
|
||||
postgres:16-alpine >/tmp/code-queue-mgr-workdirs-postgres.cid
|
||||
CQ_MGR_DB_PORT="$(docker inspect "code-queue-mgr-workdirs-$CQ_MGR_ID" --format '{{(index (index .NetworkSettings.Ports "5432/tcp") 0).HostPort}}')"
|
||||
cleanup() {
|
||||
docker rm -f "code-queue-mgr-workdirs-$CQ_MGR_ID" >/dev/null 2>&1 || true
|
||||
}
|
||||
trap cleanup EXIT
|
||||
for _ in $(seq 1 60); do
|
||||
if docker exec "code-queue-mgr-workdirs-$CQ_MGR_ID" pg_isready -U unidesk_ci -d unidesk_ci >/dev/null 2>&1; then
|
||||
break
|
||||
fi
|
||||
sleep 1
|
||||
done
|
||||
CQ_MGR_PORT="$(python3 - <<'PY'
|
||||
import socket
|
||||
s = socket.socket()
|
||||
s.bind(("127.0.0.1", 0))
|
||||
print(s.getsockname()[1])
|
||||
s.close()
|
||||
PY
|
||||
)"
|
||||
DATABASE_URL="postgres://unidesk_ci:unidesk_ci_password@127.0.0.1:$CQ_MGR_DB_PORT/unidesk_ci" \
|
||||
HOST=127.0.0.1 \
|
||||
PORT="$CQ_MGR_PORT" \
|
||||
CODE_QUEUE_MAIN_PROVIDER_ID=D601 \
|
||||
CODE_QUEUE_WORKDIR=/workspace \
|
||||
CODE_QUEUE_REMOTE_WORKDIR=/home/ubuntu \
|
||||
src/components/microservices/code-queue-mgr/target/release/code-queue-mgr >"$(workspaces.source.path)/code-queue-mgr-workdirs.log" 2>&1 &
|
||||
cq_mgr_pid="$!"
|
||||
trap 'kill "$cq_mgr_pid" >/dev/null 2>&1 || true; cleanup' EXIT
|
||||
for _ in $(seq 1 60); do
|
||||
if curl -fsS "http://127.0.0.1:$CQ_MGR_PORT/health" >/dev/null 2>&1; then
|
||||
break
|
||||
fi
|
||||
sleep 1
|
||||
done
|
||||
workdirs_json="$(curl -fsS "http://127.0.0.1:$CQ_MGR_PORT/api/workdirs")"
|
||||
printf '%s\n' "$workdirs_json"
|
||||
WORKDIRS_JSON="$workdirs_json" bun -e 'const body=JSON.parse(process.env.WORKDIRS_JSON||"{}"); if (body.ok !== true || !Array.isArray(body.workdirs) || !body.workdirs.some((item)=>item && item.path === "/workspace" && item.providerId === "D601" && item.executionMode === "default")) { console.error(JSON.stringify(body)); process.exit(1); }'
|
||||
---
|
||||
apiVersion: tekton.dev/v1
|
||||
kind: Task
|
||||
|
||||
Reference in New Issue
Block a user