diff --git a/scripts/claudeqq-artifact-event-contract-test.ts b/scripts/claudeqq-artifact-event-contract-test.ts new file mode 100644 index 00000000..f6257827 --- /dev/null +++ b/scripts/claudeqq-artifact-event-contract-test.ts @@ -0,0 +1,143 @@ +import { readFileSync } from "node:fs"; +import { rootPath } from "./src/config"; +import { runArtifactRegistryCommand } from "./src/artifact-registry"; + +const serviceId = "claudeqq"; +const desiredCommit = "203b1f46684c91340ecbbd8a74502bd55e4f2011"; +const sourceRepo = "https://gitee.com/lyon1998/agent_skills"; +const dockerfile = "claudeqq/Dockerfile"; +const eventPaths = ["/api/events/recent", "/api/events/subscriptions"]; + +type JsonRecord = Record; + +function assertCondition(condition: boolean, message: string): void { + if (!condition) throw new Error(message); +} + +function asRecord(value: unknown, path: string): JsonRecord { + assertCondition(typeof value === "object" && value !== null && !Array.isArray(value), `${path} must be an object`); + return value as JsonRecord; +} + +function asArray(value: unknown, path: string): unknown[] { + assertCondition(Array.isArray(value), `${path} must be an array`); + return value; +} + +function stringField(value: unknown, path: string): string { + assertCondition(typeof value === "string" && value.length > 0, `${path} must be a non-empty string`); + return value; +} + +function serviceById(environment: JsonRecord, id: string, path: string): JsonRecord { + const services = asArray(environment.services, `${path}.services`).map((item, index) => asRecord(item, `${path}.services[${index}]`)); + const service = services.find((item) => item.id === id); + assertCondition(service !== undefined, `${path}.services must include ${id}`); + return service!; +} + +function assertDeployJson(): void { + const deploy = asRecord(JSON.parse(readFileSync(rootPath("deploy.json"), "utf8")), "deploy.json"); + const environments = asRecord(deploy.environments, "deploy.json.environments"); + for (const environment of ["dev", "prod"] as const) { + const service = serviceById(asRecord(environments[environment], `deploy.json.environments.${environment}`), serviceId, `deploy.json.environments.${environment}`); + assertCondition(service.repo === sourceRepo, `${environment} deploy.json claudeqq repo must match source repo`); + assertCondition(service.commitId === desiredCommit, `${environment} deploy.json claudeqq commit must match desired commit`); + } +} + +function assertArtifactCatalog(): void { + const catalog = asRecord(JSON.parse(readFileSync(rootPath("CI.json"), "utf8")), "CI.json"); + const artifacts = asArray(catalog.artifacts, "CI.json.artifacts").map((item, index) => asRecord(item, `CI.json.artifacts[${index}]`)); + const artifact = artifacts.find((item) => item.serviceId === serviceId); + assertCondition(artifact !== undefined, "CI.json must include claudeqq artifact producer"); + assertCondition(artifact!.kind === "source-build", "claudeqq artifact must be source-build"); + assertCondition(artifact!.status === "supported", "claudeqq artifact must be supported"); + assertCondition(artifact!.producer === "ci publish-user-service", "claudeqq artifact producer must be ci publish-user-service"); + const source = asRecord(artifact!.source, "CI.json claudeqq source"); + assertCondition(source.repo === sourceRepo, "claudeqq artifact source repo must match agent_skills"); + assertCondition(source.dockerfile === dockerfile, "claudeqq artifact dockerfile must be claudeqq/Dockerfile"); + const image = asRecord(artifact!.image, "CI.json claudeqq image"); + assertCondition(image.repository === "unidesk/claudeqq", "claudeqq artifact image repository must be unidesk/claudeqq"); +} + +function assertAdapterSourceContract(): void { + const adapter = readFileSync(rootPath("src/components/microservices/claudeqq/adapter.js"), "utf8"); + const imageDockerfile = readFileSync(rootPath("src/components/microservices/claudeqq/Dockerfile"), "utf8"); + for (const eventPath of eventPaths) { + assertCondition(adapter.includes(eventPath), `adapter must expose ${eventPath}`); + } + assertCondition(adapter.includes("claudeqq-event-api-v1"), "adapter health metadata must name the event API contract"); + assertCondition(adapter.includes("adapter-readonly-fallback"), "adapter must include read-only event fallback"); + assertCondition(adapter.includes("CLAUDEQQ_HOST: upstreamHost"), "adapter must bind the upstream server to the private upstream host"); + assertCondition(adapter.includes("CLAUDEQQ_PORT: String(upstreamPort)"), "adapter must bind the upstream server to the private upstream port"); + assertCondition(adapter.includes("POST /api/events/subscriptions"), "adapter health metadata must document event subscription mutation paths"); + assertCondition(imageDockerfile.includes('CMD ["node", "unidesk-adapter.cjs"]'), "claudeqq image must start the UniDesk adapter by default"); +} + +async function assertDryRun(environment: "dev" | "prod"): Promise { + const plan = asRecord(await runArtifactRegistryCommand([ + "deploy-service", + "--env", + environment, + "--service", + serviceId, + "--commit", + desiredCommit, + "--dry-run", + ]), `artifact dry-run ${environment}`); + + assertCondition(plan.ok === true, `${environment} dry-run must be ok`); + assertCondition(plan.supported === true, `${environment} dry-run must be supported`); + assertCondition(plan.dryRun === true, `${environment} dry-run must report dryRun=true`); + assertCondition(plan.mutation === false, `${environment} dry-run must not mutate`); + assertCondition(plan.serviceId === serviceId, `${environment} dry-run serviceId must be claudeqq`); + assertCondition(plan.commit === desiredCommit, `${environment} dry-run commit must match desired commit`); + assertCondition(plan.sourceRepo === sourceRepo, `${environment} dry-run source repo must match`); + + const source = asRecord(plan.source, `${environment}.source`); + assertCondition(source.repo === sourceRepo, `${environment} dry-run source.repo must match`); + assertCondition(source.commit === desiredCommit, `${environment} dry-run source.commit must match`); + assertCondition(source.dockerfile === dockerfile, `${environment} dry-run source.dockerfile must match`); + + const build = asRecord(plan.build, `${environment}.build`); + assertCondition(build.willCompile === false, `${environment} dry-run must not compile on the runtime target`); + assertCondition(build.willRunDockerBuild === false, `${environment} dry-run must not docker build on the runtime target`); + assertCondition(build.willRunDockerComposeBuild === false, `${environment} dry-run must not docker compose build on the runtime target`); + + const labels = asRecord(plan.requiredLabels, `${environment}.requiredLabels`); + assertCondition(labels["unidesk.ai/service-id"] === serviceId, `${environment} labels must include service id`); + assertCondition(labels["unidesk.ai/source-repo"] === sourceRepo, `${environment} labels must include source repo`); + assertCondition(labels["unidesk.ai/source-commit"] === desiredCommit, `${environment} labels must include source commit`); + assertCondition(labels["unidesk.ai/dockerfile"] === dockerfile, `${environment} labels must include dockerfile`); + + const target = asRecord(plan.target, `${environment}.target`); + assertCondition(target.kind === "d601-k3s", `${environment} target kind must be d601-k3s`); + assertCondition(target.namespace === (environment === "dev" ? "unidesk-dev" : "unidesk"), `${environment} namespace must match`); + assertCondition(target.deployment === (environment === "dev" ? "claudeqq-dev" : "claudeqq"), `${environment} deployment must match`); + assertCondition(target.service === (environment === "dev" ? "claudeqq-dev" : "claudeqq"), `${environment} service must match`); + assertCondition(target.runtimeImage === `unidesk-claudeqq:${desiredCommit}`, `${environment} runtime image must be commit-pinned`); + + const validation = asArray(plan.validation, `${environment}.validation`).map((item, index) => stringField(item, `${environment}.validation[${index}]`)); + assertCondition(validation.some((line) => line.includes("service health via Kubernetes API service proxy")), `${environment} dry-run must require API service proxy health`); + assertCondition(validation.some((line) => line.includes("source repo")), `${environment} dry-run must require source repo label validation`); +} + +assertDeployJson(); +assertArtifactCatalog(); +assertAdapterSourceContract(); +await assertDryRun("dev"); +await assertDryRun("prod"); + +console.log(JSON.stringify({ + ok: true, + serviceId, + desiredCommit, + eventPaths, + checks: [ + "deploy.json desired dev/prod commit", + "CI.json artifact producer contract", + "adapter read-only event API fallback and health metadata", + "dev/prod artifact-registry deploy-service dry-runs", + ], +}, null, 2)); diff --git a/src/components/microservices/claudeqq/Dockerfile b/src/components/microservices/claudeqq/Dockerfile index 2a087fee..c08984de 100644 --- a/src/components/microservices/claudeqq/Dockerfile +++ b/src/components/microservices/claudeqq/Dockerfile @@ -23,4 +23,4 @@ COPY --from=build /app/unidesk-adapter.cjs ./scripts/src/server_ts/unidesk-adapt WORKDIR /app/scripts/src/server_ts RUN npm ci --omit=dev && npm cache clean --force -CMD ["node", "dist/index.js", "--max-restarts", "0"] +CMD ["node", "unidesk-adapter.cjs"] diff --git a/src/components/microservices/claudeqq/adapter.js b/src/components/microservices/claudeqq/adapter.js index cd43a4fc..a1a3d23e 100644 --- a/src/components/microservices/claudeqq/adapter.js +++ b/src/components/microservices/claudeqq/adapter.js @@ -1,4 +1,6 @@ const http = require("node:http"); +const fs = require("node:fs"); +const path = require("node:path"); const { spawn } = require("node:child_process"); const { URL } = require("node:url"); @@ -10,6 +12,8 @@ const deployCommit = process.env.UNIDESK_DEPLOY_COMMIT || ""; const deployRequestedCommit = process.env.UNIDESK_DEPLOY_REQUESTED_COMMIT || deployCommit; const deployRepo = process.env.UNIDESK_DEPLOY_REPO || ""; const serviceId = process.env.UNIDESK_DEPLOY_SERVICE_ID || "claudeqq"; +const workspaceDir = process.env.CLAUDEQQ_WORKSPACE_DIR || "/bot_workspace"; +const stateDir = process.env.CLAUDEQQ_STATE_DIR || "/app/.state"; const startedAt = new Date().toISOString(); const endpoints = [ "/health", @@ -24,7 +28,11 @@ const endpoints = [ const child = spawn("node", ["dist/index.js", "--max-restarts", "0"], { cwd: process.env.CLAUDEQQ_SERVER_CWD || process.cwd(), - env: process.env, + env: { + ...process.env, + CLAUDEQQ_HOST: upstreamHost, + CLAUDEQQ_PORT: String(upstreamPort), + }, stdio: "inherit", }); @@ -51,6 +59,148 @@ function readBody(req) { }); } +function asRecord(value) { + return value && typeof value === "object" && !Array.isArray(value) ? value : {}; +} + +function numberOrUndefined(value) { + const numberValue = typeof value === "number" ? value : Number.parseInt(String(value || ""), 10); + return Number.isFinite(numberValue) ? numberValue : undefined; +} + +function parseJson(value, fallback) { + try { + return JSON.parse(value); + } catch { + return fallback; + } +} + +function clampLimit(value) { + const parsed = Number.parseInt(String(value || "50"), 10); + if (!Number.isFinite(parsed) || parsed <= 0) return 50; + return Math.min(parsed, 200); +} + +function textFromMessage(message) { + if (typeof message === "string") return message; + if (!Array.isArray(message)) return ""; + return message + .map((segment) => { + const data = asRecord(asRecord(segment).data); + if (typeof data.text === "string") return data.text; + return ""; + }) + .join(""); +} + +function eventFromRecord(record, fallbackIndex) { + const item = asRecord(record); + const raw = asRecord(item.raw_message || item.raw || item); + const messageId = numberOrUndefined(raw.message_id ?? item.message_id); + const time = numberOrUndefined(raw.time ?? item.time); + const userId = raw.user_id ?? item.user_id; + const groupId = raw.group_id ?? item.group_id; + const eventId = typeof item.id === "string" && item.id.length > 0 + ? item.id + : [time || "unknown", messageId || fallbackIndex, userId || "unknown", groupId || "private"].join(":"); + + return { + id: eventId, + type: typeof item.type === "string" ? item.type : "message", + source: typeof item.source === "string" ? item.source : "messages-file", + timestamp: typeof item.timestamp === "string" + ? item.timestamp + : time + ? new Date(time * 1000).toISOString() + : new Date(0).toISOString(), + messageType: typeof raw.message_type === "string" ? raw.message_type : (groupId ? "group" : "private"), + userId: userId === undefined ? undefined : String(userId), + groupId: groupId === undefined ? undefined : String(groupId), + messageId, + text: textFromMessage(raw.message ?? item.message ?? item.text), + raw, + }; +} + +function readRecentEvents(searchParams) { + const limit = clampLimit(searchParams.get("limit")); + const since = searchParams.get("since"); + const messagesFile = path.join(workspaceDir, "messages", "messages.jsonl"); + try { + if (!fs.existsSync(messagesFile)) return []; + const lines = fs.readFileSync(messagesFile, "utf8").split(/\r?\n/).filter(Boolean); + const events = lines + .slice(-Math.max(limit * 2, limit)) + .map((line, index) => eventFromRecord(parseJson(line, {}), index)) + .filter((event) => !since || event.timestamp > since); + return events.slice(-limit).reverse(); + } catch { + return []; + } +} + +function readSubscriptions() { + const subscriptionsFile = path.join(stateDir, "event_subscriptions.json"); + try { + if (!fs.existsSync(subscriptionsFile)) return []; + const parsed = parseJson(fs.readFileSync(subscriptionsFile, "utf8"), []); + return Array.isArray(parsed) ? parsed.map(asRecord) : []; + } catch { + return []; + } +} + +function fallbackEventResponse(pathname, searchParams, upstream) { + if (pathname === "/api/events/recent") { + const events = readRecentEvents(searchParams); + return { + ok: true, + events, + count: events.length, + nextCursor: null, + source: "adapter-readonly-fallback", + degraded: true, + reason: "upstream-event-api-unavailable", + upstream: { ok: upstream.ok, status: upstream.status }, + }; + } + const subscriptions = readSubscriptions(); + return { + ok: true, + subscriptions, + count: subscriptions.length, + source: "adapter-readonly-fallback", + degraded: true, + reason: "upstream-event-api-unavailable", + upstream: { ok: upstream.ok, status: upstream.status }, + }; +} + +function normalizeEventApiBody(pathname, body, upstream) { + const record = asRecord(body); + if (record.ok === true) return record; + if (pathname === "/api/events/recent" && Array.isArray(record.events)) { + return { ...record, ok: true, count: record.events.length, source: "upstream", upstream: { ok: upstream.ok, status: upstream.status } }; + } + if (pathname === "/api/events/subscriptions" && Array.isArray(record.subscriptions)) { + return { ...record, ok: true, count: record.subscriptions.length, source: "upstream", upstream: { ok: upstream.ok, status: upstream.status } }; + } + return null; +} + +async function eventApiRead(res, url) { + const upstream = await upstreamFetch(`${url.pathname}${url.search}`); + if (upstream.ok && typeof upstream.body === "string") { + const normalized = normalizeEventApiBody(url.pathname, parseJson(upstream.body, null), upstream); + if (normalized) { + json(res, 200, normalized); + return; + } + } + json(res, 200, fallbackEventResponse(url.pathname, url.searchParams, upstream)); +} + function normalizePushPayload(raw) { const body = JSON.parse(raw.length > 0 ? raw.toString("utf8") : "{}"); const message = typeof body.message === "string" ? body.message : ""; @@ -70,7 +220,10 @@ function upstreamFetch(path, method = "GET") { const req = http.request({ host: upstreamHost, port: upstreamPort, path, method, timeout: 2000 }, (res) => { const chunks = []; res.on("data", (chunk) => chunks.push(Buffer.from(chunk))); - res.on("end", () => resolve({ ok: (res.statusCode || 500) < 500, status: res.statusCode || 0, body: Buffer.concat(chunks).toString("utf8") })); + res.on("end", () => { + const status = res.statusCode || 0; + resolve({ ok: status >= 200 && status < 300, status, body: Buffer.concat(chunks).toString("utf8") }); + }); }); req.on("timeout", () => req.destroy(new Error("timeout"))); req.on("error", (error) => resolve({ ok: false, status: 0, body: String(error?.message || error) })); @@ -140,6 +293,12 @@ const server = http.createServer(async (req, res) => { wsPort: statusNapcat.wsPort || 3001, }, adapter: "unidesk-claudeqq-adapter", + eventApi: { + contract: "claudeqq-event-api-v1", + readonlyFallback: true, + validationPaths: ["/api/events/recent", "/api/events/subscriptions"], + mutationPaths: ["POST /api/events/subscriptions", "DELETE /api/events/subscriptions/{id}"], + }, startedAt, upstream: { url: `http://${upstreamHost}:${upstreamPort}`, ok: upstreamHealth.ok, status: upstreamHealth.status }, deploy: { @@ -155,6 +314,10 @@ const server = http.createServer(async (req, res) => { await proxy({ ...req, method: "POST" }, res, "/api/send/text", body); return; } + if ((url.pathname === "/api/events/recent" || url.pathname === "/api/events/subscriptions") && req.method === "GET") { + await eventApiRead(res, url); + return; + } await proxy(req, res, `${url.pathname}${url.search}`, undefined); } catch (error) { json(res, 400, { ok: false, success: false, service: serviceId, error: String(error?.message || error) });