Files
pikasTech-unidesk/src/components/microservices/code-queue/src/code-agent/opencode.ts
T
2026-05-16 15:25:58 +00:00

428 lines
20 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
// 重构前 index.ts 只读参考:commit 6a04144d3f5103014f75b637d7e6bc2f45bf007fblob 56e590c1a6b5ca7ad128bf2c992f60e46c355a58;可用 `git show 6a04144d3f5103014f75b637d7e6bc2f45bf007f:src/components/microservices/code-queue/src/index.ts` 查看。
import { spawn, type ChildProcessWithoutNullStreams } from "node:child_process";
import { mkdirSync } from "node:fs";
import { resolve } from "node:path";
import * as readline from "node:readline";
import type { AppServerExit, CodexEventSummary, CodexRunResult, JsonValue, QueueTask, RuntimeConfig, TerminalStatus } from "../types";
import type { ActiveRun, CodeAgentClient } from "./common";
import { codeAgentGitConfigEntries, extractRecord, minimaxM27Model, normalizeCodeModel, stripAnsi, withCodeAgentGitConfigEnv } from "./common";
export interface OpenCodePortContext {
config: Pick<RuntimeConfig, "defaultWorkdir" | "minimaxApiBase" | "minimaxApiKey" | "minimaxModel" | "turnNoActivityTimeoutMs">;
activeRuns: Map<string, ActiveRun>;
addEvent: (task: QueueTask, event: CodexEventSummary) => void;
appendOutput: (task: QueueTask, channel: "system" | "assistant" | "reasoning" | "command" | "diff" | "tool" | "error", text: string, method?: string, itemId?: string, append?: boolean) => unknown;
buildDevContainerPlan: (providerId: string, body: Record<string, unknown>) => { containerName: string; remoteOpencodeXdgDir: string };
compactRetryTaskContext: (task: QueueTask) => string;
ensureTaskExecutionContainer: (task: QueueTask) => Promise<void>;
judgeReasonForPrompt: (reason: string) => string;
logger: (level: "debug" | "info" | "warn" | "error", message: string, data?: JsonValue) => void;
nowIso: () => string;
openCodeFreshRecoveryPrompt: (task: QueueTask, prompt: string, reason: string) => string;
openCodeXdgEnv: (root?: string) => Record<string, string>;
persistTaskState: (task: QueueTask) => void;
providerIsMain: (providerId: string) => boolean;
queueIdOf: (task: QueueTask) => string;
recordStringField: (record: Record<string, unknown> | null, keys: string[], max?: number) => string;
remoteHostWorkdirForTask: (task: QueueTask) => string;
safePreview: (value: string, max?: number) => string;
shellQuote: (value: string) => string;
shutdownRequested: () => boolean;
}
let context: OpenCodePortContext | null = null;
export function configureOpenCodePort(runtimeContext: OpenCodePortContext): void {
context = runtimeContext;
}
function ctx(): OpenCodePortContext {
if (context === null) throw new Error("opencode port is not configured");
return context;
}
interface OpenCodeTextParts {
reasoning: string;
assistant: string;
}
function stripThinkBlocks(text: string): string {
return String(text || "").replace(/<(?:think|thinking)\b[^>]*>[\s\S]*?<\/(?:think|thinking)>/giu, "").trim();
}
function splitOpenCodeAssistantText(text: string): OpenCodeTextParts {
const reasoning: string[] = [];
const withoutClosedThink = String(text || "").replace(/<(?:think|thinking)\b[^>]*>([\s\S]*?)<\/(?:think|thinking)>/giu, (_match, body) => {
const item = String(body || "").trim();
if (item.length > 0) reasoning.push(item);
return "";
});
const closeThinkMatches = Array.from(withoutClosedThink.matchAll(/<\/(?:think|thinking)>/giu));
const lastClose = closeThinkMatches.at(-1);
const assistant = lastClose?.index === undefined
? withoutClosedThink.trim()
: withoutClosedThink.slice(lastClose.index + lastClose[0].length).trim();
return { reasoning: reasoning.join("\n\n").trim(), assistant };
}
function openCodeModelId(model: string): string {
if (normalizeCodeModel(model) !== minimaxM27Model) throw new Error(`OpenCode port does not support model ${model}`);
const providerModel = ctx().config.minimaxModel.trim() || "MiniMax-M2.7";
return `minimax/${providerModel}`;
}
function openCodeConfigContent(): string {
const providerModel = ctx().config.minimaxModel.trim() || "MiniMax-M2.7";
return JSON.stringify({
$schema: "https://opencode.ai/config.json",
provider: {
minimax: {
npm: "@ai-sdk/openai-compatible",
name: "MiniMax",
options: {
baseURL: ctx().config.minimaxApiBase,
apiKey: "{env:MINIMAX_API_KEY}",
},
models: {
[providerModel]: {
name: minimaxM27Model,
limit: { context: 200000, output: 16384 },
},
},
},
},
});
}
function openCodeTaskXdgRoot(task: QueueTask, baseRoot?: string): string {
return resolve(baseRoot ?? ctx().openCodeXdgEnv().XDG_DATA_HOME, "..", "tasks", task.id);
}
function ensureOpenCodeXdgDirs(env: Record<string, string>): void {
for (const dir of Object.values(env)) mkdirSync(dir, { recursive: true });
}
function openCodeEnv(task: QueueTask): NodeJS.ProcessEnv {
const xdgEnv = ctx().openCodeXdgEnv(openCodeTaskXdgRoot(task));
ensureOpenCodeXdgDirs(xdgEnv);
return withCodeAgentGitConfigEnv({
...process.env,
...xdgEnv,
MINIMAX_API_KEY: ctx().config.minimaxApiKey,
MINIMAX_API_BASE: ctx().config.minimaxApiBase,
MINIMAX_MODEL: ctx().config.minimaxModel,
OPENCODE_CONFIG_CONTENT: openCodeConfigContent(),
});
}
function shellJoin(args: string[]): string {
return args.map(ctx().shellQuote).join(" ");
}
function openCodeRunArgs(task: QueueTask, prompt: string): string[] {
const args = ["run", "--format", "json", "--model", openCodeModelId(task.model), "--dir", task.cwd, "--dangerously-skip-permissions"];
if (task.codexThreadId !== null && task.codexThreadId.trim().length > 0) args.push("--session", task.codexThreadId);
args.push(prompt);
return args;
}
function remoteOpenCodeRunCommand(task: QueueTask, prompt: string): string {
const plan = ctx().buildDevContainerPlan(task.providerId, { workdir: ctx().remoteHostWorkdirForTask(task) });
const xdgEnv = ctx().openCodeXdgEnv(resolve(plan.remoteOpencodeXdgDir, "tasks", task.id));
const gitConfigEntries = codeAgentGitConfigEntries("");
const envExports = [
...Object.entries(xdgEnv).map(([key, value]) => `export ${key}=${ctx().shellQuote(value)}`),
`export MINIMAX_API_BASE=${ctx().shellQuote(ctx().config.minimaxApiBase)}`,
`export MINIMAX_MODEL=${ctx().shellQuote(ctx().config.minimaxModel)}`,
`export OPENCODE_CONFIG_CONTENT=${ctx().shellQuote(openCodeConfigContent())}`,
...gitConfigEntries.map(([key, value], index) => `export GIT_CONFIG_KEY_$((\${GIT_CONFIG_COUNT:-0}+${index}))=${ctx().shellQuote(key)} GIT_CONFIG_VALUE_$((\${GIT_CONFIG_COUNT:-0}+${index}))=${ctx().shellQuote(value)}`),
`export GIT_CONFIG_COUNT=$((\${GIT_CONFIG_COUNT:-0}+${gitConfigEntries.length}))`,
].join("; ");
const inner = [
"set -euo pipefail",
`mkdir -p ${Object.values(xdgEnv).map(ctx().shellQuote).join(" ")}`,
`mkdir -p ${ctx().shellQuote(task.cwd)}`,
`cd ${ctx().shellQuote(task.cwd)}`,
envExports,
`exec opencode ${shellJoin(openCodeRunArgs(task, prompt))}`,
].join("; ");
return `docker exec -i ${ctx().shellQuote(plan.containerName)} bash -lc ${ctx().shellQuote(inner)}`;
}
export function remoteOpenCodeRunCommandForTest(task: QueueTask, prompt: string): string {
return remoteOpenCodeRunCommand(task, prompt);
}
export function openCodeTransportClosedBeforeTerminal(exitOk: boolean, hasFinalResponse: boolean, stepFinished: boolean): boolean {
// Some OpenCode JSON streams can exit cleanly with visible assistant text but
// without a step_finish event. Treat exit=0 plus a current final response as
// terminal; otherwise the judge safety gate will retry a completed turn.
return !exitOk || (!hasFinalResponse && !stepFinished);
}
function openCodeSessionIdFromRecord(record: Record<string, unknown>, part: Record<string, unknown> | null): string | null {
const top = ctx().recordStringField(record, ["sessionID", "sessionId", "session_id"]);
if (top.length > 0) return top;
const nested = ctx().recordStringField(part, ["sessionID", "sessionId", "session_id"]);
return nested.length > 0 ? nested : null;
}
function openCodeEventSummary(record: Record<string, unknown>): CodexEventSummary {
const part = extractRecord(record.part);
const type = ctx().recordStringField(record, ["type", "event", "name"]) || ctx().recordStringField(part, ["type"]) || "unknown";
const text = ctx().recordStringField(part, ["text", "content", "delta", "message"]) || ctx().recordStringField(record, ["text", "content", "delta", "message"]);
const error = ctx().recordStringField(part, ["error", "message"]) || ctx().recordStringField(record, ["error", "message"]);
return {
at: ctx().nowIso(),
method: `opencode/${type}`,
itemType: ctx().recordStringField(part, ["type"]) || type,
status: ctx().recordStringField(part, ["status", "reason"]) || ctx().recordStringField(record, ["status", "reason"]) || undefined,
message: error.length > 0 ? ctx().safePreview(error, 600) : undefined,
textPreview: text.length > 0 ? ctx().safePreview(text, 800) : undefined,
};
}
class OpenCodeRunClient implements CodeAgentClient {
private child: ChildProcessWithoutNullStreams;
private stderrChunks: Buffer[] = [];
private closed = false;
private closeResolve!: (value: AppServerExit) => void;
private assistantChunks: string[] = [];
private sessionAnnounced = false;
readonly closedPromise: Promise<AppServerExit>;
readonly runId = `opencode_${Date.now()}_${Math.random().toString(16).slice(2, 8)}`;
readonly events: CodexEventSummary[] = [];
sessionId: string | null = null;
finalResponse = "";
stepFinished = false;
lastActivityAt = Date.now();
constructor(private readonly task: QueueTask, prompt: string) {
this.closedPromise = new Promise((resolveClosed) => { this.closeResolve = resolveClosed; });
this.child = ctx().providerIsMain(task.providerId)
? spawn("opencode", openCodeRunArgs(task, prompt), {
cwd: task.cwd,
env: openCodeEnv(task),
stdio: "pipe",
})
: spawn("bun", ["scripts/cli.ts", "ssh", task.providerId, remoteOpenCodeRunCommand(task, prompt)], {
cwd: ctx().config.defaultWorkdir,
env: process.env,
stdio: "pipe",
});
// opencode waits for stdin EOF even when the prompt is supplied as argv.
// Close stdin immediately so non-interactive queue runs can start.
this.child.stdin.end();
this.child.stderr.on("data", (chunk: Buffer) => {
this.stderrChunks.push(chunk);
while (Buffer.concat(this.stderrChunks).length > 96_000) this.stderrChunks.shift();
});
const rl = readline.createInterface({ input: this.child.stdout, crlfDelay: Infinity });
void this.readLines(rl);
this.child.on("close", (code, signal) => this.handleClose(code, signal));
this.child.on("error", (error) => this.handleClose(127, error.message));
}
stop(): void {
if (this.closed) return;
this.child.kill("SIGTERM");
setTimeout(() => {
if (!this.closed) this.child.kill("SIGKILL");
}, 1500).unref?.();
}
private async readLines(rl: readline.Interface): Promise<void> {
try {
for await (const line of rl) {
const trimmed = String(line).trim();
if (trimmed.length === 0) continue;
this.lastActivityAt = Date.now();
this.handleLine(trimmed);
}
} catch (error) {
ctx().appendOutput(this.task, "error", `opencode stream error: ${error instanceof Error ? error.message : String(error)}\n`, "opencode/stream");
}
}
private handleLine(line: string): void {
let parsed: Record<string, unknown> | null = null;
try {
const value = JSON.parse(line) as unknown;
parsed = extractRecord(value);
} catch {
this.appendAssistantText(line, "opencode/stdout", undefined);
return;
}
if (parsed === null) {
this.appendAssistantText(line, "opencode/stdout", undefined);
return;
}
const part = extractRecord(parsed.part);
const event = openCodeEventSummary(parsed);
this.events.push(event);
ctx().addEvent(this.task, event);
const sessionId = openCodeSessionIdFromRecord(parsed, part);
if (sessionId !== null) this.setSessionId(sessionId);
const type = ctx().recordStringField(parsed, ["type", "event", "name"]) || ctx().recordStringField(part, ["type"]) || "unknown";
const partType = ctx().recordStringField(part, ["type"]);
const itemId = ctx().recordStringField(part, ["id"]) || undefined;
if (type === "step_finish" || type === "step-finish" || partType === "step-finish") {
this.stepFinished = true;
return;
}
if (type === "step_start" || type === "step-start" || partType === "step-start") {
return;
}
const text = ctx().recordStringField(part, ["text", "content", "delta"]) || ctx().recordStringField(parsed, ["text", "content", "delta"]);
if (text.length > 0 && (type === "text" || partType === "text" || partType === "reasoning")) {
if (partType === "reasoning") ctx().appendOutput(this.task, "reasoning", `${text.trimEnd()}\n`, "opencode/reasoning", itemId, true);
else this.appendAssistantText(text, "opencode/text", itemId);
return;
}
if (/tool|bash|command/iu.test(`${type} ${partType}`)) {
ctx().appendOutput(this.task, type.includes("command") || partType.includes("command") ? "command" : "tool", `${JSON.stringify(parsed)}\n`, "opencode/tool", itemId);
return;
}
if (/error|failed/iu.test(`${type} ${partType}`)) {
ctx().appendOutput(this.task, "error", `${ctx().safePreview(JSON.stringify(parsed), 3000)}\n`, "opencode/error", itemId);
}
}
private setSessionId(sessionId: string): void {
this.sessionId = sessionId;
if (this.task.codexThreadId === null) this.task.codexThreadId = sessionId;
const run = ctx().activeRuns.get(ctx().queueIdOf(this.task));
if (run?.app === this) run.threadId = sessionId;
if (this.sessionAnnounced) return;
const resumed = this.task.currentMode === "retry" || this.task.attempts.length > 0;
ctx().appendOutput(this.task, "system", `opencode session ${resumed ? "resumed" : "started"} ${sessionId}\n`, resumed ? "opencode/session-resume" : "opencode/session-start");
this.sessionAnnounced = true;
}
private appendAssistantText(rawText: string, method: string, itemId: string | undefined): void {
const parts = splitOpenCodeAssistantText(rawText);
if (parts.reasoning.length > 0) ctx().appendOutput(this.task, "reasoning", `${parts.reasoning.trimEnd()}\n`, "opencode/reasoning", itemId, true);
const visible = parts.assistant.length > 0 ? parts.assistant : parts.reasoning.length > 0 ? "" : rawText.trim();
if (visible.length === 0) return;
this.assistantChunks.push(visible);
this.finalResponse = stripThinkBlocks(this.assistantChunks.join("\n\n").trim());
this.task.finalResponse = this.finalResponse;
ctx().appendOutput(this.task, "assistant", `${visible.trimEnd()}\n`, method, itemId, true);
}
private handleClose(code: number | null, signal: string | null): void {
if (this.closed) return;
this.closed = true;
this.closeResolve({ code, signal, stderrTail: Buffer.concat(this.stderrChunks).toString("utf8").slice(-8000) });
}
}
function openCodeExitError(exit: AppServerExit): string {
const base = `OpenCode exited with code=${exit.code} signal=${exit.signal}`;
const stderr = stripAnsi(exit.stderrTail).trim();
return stderr.length > 0 ? `${base}: ${ctx().safePreview(stderr, 800)}` : base;
}
function openCodeSessionMissing(result: CodexRunResult): boolean {
return /Session not found/iu.test(stripAnsi(`${result.terminalError ?? ""}\n${result.appServerExit.stderrTail}`));
}
export async function runOpenCodeTurn(task: QueueTask, prompt: string): Promise<CodexRunResult> {
const attemptedSessionId = task.codexThreadId;
const first = await runOpenCodeTurnOnce(task, prompt);
if (attemptedSessionId === null || task.cancelRequested || ctx().shutdownRequested() || !openCodeSessionMissing(first)) return first;
const reason = first.terminalError ?? first.appServerExit.stderrTail;
ctx().appendOutput(task, "system", `opencode session ${attemptedSessionId} was not found; clearing stale session and starting a fresh OpenCode session via the opencode port\n`, "opencode/session-recovery");
ctx().logger("warn", "opencode_session_missing_recover_fresh", { taskId: task.id, sessionId: attemptedSessionId, reason: ctx().safePreview(stripAnsi(reason), 500) });
task.codexThreadId = null;
task.activeTurnId = null;
ctx().persistTaskState(task);
return runOpenCodeTurnOnce(task, ctx().openCodeFreshRecoveryPrompt(task, prompt, reason));
}
async function runOpenCodeTurnOnce(task: QueueTask, prompt: string): Promise<CodexRunResult> {
const queueId = ctx().queueIdOf(task);
if (ctx().config.minimaxApiKey.length === 0) {
const message = "MINIMAX_API_KEY is required for opencode model minimax-m2.7.";
ctx().appendOutput(task, "error", `${message}\n`, "opencode/config");
return {
threadId: task.codexThreadId,
turnId: null,
finalResponse: "",
terminalStatus: "failed",
terminalError: message,
transportClosedBeforeTerminal: true,
appServerExit: { code: 1, signal: null, stderrTail: message },
events: [],
};
}
await ctx().ensureTaskExecutionContainer(task);
const app = new OpenCodeRunClient(task, prompt);
ctx().activeRuns.set(queueId, { taskId: task.id, queueId, app, port: "opencode", threadId: task.codexThreadId, turnId: app.runId });
task.activeTurnId = null;
ctx().persistTaskState(task);
const activityWatchdog = setInterval(() => {
const idleMs = Date.now() - app.lastActivityAt;
if (idleMs < ctx().config.turnNoActivityTimeoutMs) return;
const message = `No OpenCode activity for ${Math.round(idleMs / 1000)}s; stopping opencode run so the existing session can retry.`;
ctx().appendOutput(task, "error", `${message}\n`, "turn/no-activity-watchdog");
ctx().logger("warn", "opencode_no_activity_watchdog", { taskId: task.id, runId: app.runId, idleMs, timeoutMs: ctx().config.turnNoActivityTimeoutMs });
app.stop();
}, 15_000);
try {
const exit = await app.closedPromise;
clearInterval(activityWatchdog);
const finalResponse = app.finalResponse.trim().length > 0 ? app.finalResponse : "";
const exitOk = exit.code === 0;
const hasFinal = finalResponse.trim().length > 0;
const status: TerminalStatus = exitOk && hasFinal ? "completed" : "failed";
const terminalError = status === "completed"
? null
: exitOk
? "OpenCode returned no final assistant response."
: openCodeExitError(exit);
const stderr = stripAnsi(exit.stderrTail).trim();
ctx().appendOutput(
task,
status === "completed" ? "system" : "error",
`opencode completed status=${status} exit=${exit.code ?? "null"} signal=${exit.signal ?? "null"}${stderr.length > 0 ? ` stderr=${ctx().safePreview(stderr, 1200)}` : ""}\n`,
"opencode/complete",
);
return {
threadId: app.sessionId ?? task.codexThreadId,
turnId: app.runId,
finalResponse,
terminalStatus: status,
terminalError,
transportClosedBeforeTerminal: openCodeTransportClosedBeforeTerminal(exitOk, hasFinal, app.stepFinished),
appServerExit: exit,
events: app.events,
};
} catch (error) {
clearInterval(activityWatchdog);
const message = error instanceof Error ? error.message : String(error);
ctx().appendOutput(task, "error", `${message}\n`, "opencode");
app.stop();
const exit = await app.closedPromise;
return {
threadId: app.sessionId ?? task.codexThreadId,
turnId: app.runId,
finalResponse: app.finalResponse.trim().length > 0 ? app.finalResponse : "",
terminalStatus: "failed",
terminalError: message,
transportClosedBeforeTerminal: true,
appServerExit: exit,
events: app.events,
};
} finally {
clearInterval(activityWatchdog);
if (ctx().activeRuns.get(queueId)?.app === app) ctx().activeRuns.delete(queueId);
app.stop();
}
}