feat: wire wechat archive through langbot and n8n

This commit is contained in:
Codex
2026-06-13 04:32:52 +00:00
parent e47eca77ab
commit c818d0cd44
8 changed files with 653 additions and 78 deletions
+13
View File
@@ -17,6 +17,12 @@ langbot:
publicBaseUrl: https://langbot.pikapython.com
expectedAdapter: openclaw-weixin
callbackPath: /callback/command
pipeline:
name: UniDesk WeChat Baidu Archive
description: Forward WeChat/OpenClaw messages to the UniDesk n8n archive workflow.
runner: n8n-service-api
outputKey: response
timeoutSeconds: 120
notes:
- LangBot receives WeChat/OpenClaw messages and forwards archive events to the n8n workflow webhook.
- Credentials for real WeChat channels are bound in LangBot runtime and are not stored in this YAML.
@@ -31,6 +37,13 @@ n8n:
active: true
timezone: Asia/Shanghai
archiveCallback:
publicUrl: http://74.48.78.17:18081/webhooks/wechat-archive
secretRoot: /root/unidesk/.state/secrets
tokenSourceRef: platform-infra/wechat-archive.env
tokenKey: UNIDESK_WECHAT_ARCHIVE_TOKEN
timeoutMs: 90000
baiduNetdisk:
serviceId: baidu-netdisk
proxyMode: backend-core-microservice-proxy
+10
View File
@@ -53,6 +53,16 @@ sources:
N8N_ENCRYPTION_KEY:
bytes: 32
prefix: ""
- sourceRef: platform-infra/wechat-archive.env
type: env
requiredKeys:
- UNIDESK_WECHAT_ARCHIVE_TOKEN
createIfMissing:
enabled: true
randomBase64Url:
UNIDESK_WECHAT_ARCHIVE_TOKEN:
bytes: 32
prefix: uwa_
targets:
- id: platform-infra-g14
+1
View File
@@ -390,6 +390,7 @@ services:
PROVIDER_TOKEN: "${UNIDESK_PROVIDER_TOKEN}"
UNIDESK_SSH_CLIENT_TOKEN: "${UNIDESK_SSH_CLIENT_TOKEN:-}"
UNIDESK_SSH_CLIENT_ROUTE_ALLOWLIST: "${UNIDESK_SSH_CLIENT_ROUTE_ALLOWLIST:-G14,G14:*,D601,D601:*}"
UNIDESK_WECHAT_ARCHIVE_TOKEN: "${UNIDESK_WECHAT_ARCHIVE_TOKEN:-}"
SESSION_SECRET: "${UNIDESK_SESSION_SECRET}"
SESSION_TTL_SECONDS: "${UNIDESK_SESSION_TTL_SECONDS}"
UNIDESK_DEPLOY_REF: "${UNIDESK_FRONTEND_DEPLOY_REF:-deploy.json#environments.prod.services.frontend}"
+3
View File
@@ -62,7 +62,10 @@
- WeChat-to-Baidu archive automation is a shared platform workflow, not a separate service-specific fork. Its durable source of truth is `config/platform-infra/wechat-archive.yaml`; the canonical entrypoint is `bun scripts/cli.ts platform-infra wechat-archive plan|apply|status|validate|pull`.
- The workflow composes the existing LangBot public service, existing n8n public service, and the private `baidu-netdisk` microservice. LangBot remains the chat ingress, n8n owns webhook normalization/orchestration, and Baidu upload/download is performed through backend-core microservice proxy so Baidu OAuth tokens are never exposed in G14 or CLI output.
- Text and image archive policy, remote path templates, staging roots, webhook path, timeout and validation fixtures must stay in YAML. CLI code may validate the YAML shape and render n8n workflow JSON, but it must not hard-code current path roots, credentials, message channel IDs, or Baidu account choices outside YAML/service runtime.
- The archive callback token is controlled by `archiveCallback.secretRoot`, `archiveCallback.tokenSourceRef`, and `archiveCallback.tokenKey` in YAML plus `config/secrets-distribution.yaml`. `secrets sync` may create the local source when YAML explicitly allows it; n8n receives the token only through controlled workflow rendering. Do not recover this token from the n8n database, frontend runtime, Baidu runtime, pod env, or logs.
- For the current n8n runtime, production webhook reachability uses the registered path shape `workflowId/nodeName/webhookPath`; workflow node names used in generated webhooks should be ASCII path-safe, and `webhookPath` in YAML should remain one relative path segment.
- Generated n8n workflows should use n8n-native HTTP Request nodes for outbound service callbacks. Code nodes may normalize payloads, but must not assume sandbox globals such as `fetch` exist in the runtime.
- If LangBot or n8n public HTTPS fails while in-cluster service and FRP local-port probes are healthy, restore the PK01 Caddy managed blocks through `platform-infra langbot apply --confirm --wait` or `platform-infra n8n apply --confirm --wait`. Do not manually edit Caddy as the durable fix.
- The archive uses the same single PK01/Pika01 PostgreSQL instance indirectly through the existing LangBot and n8n databases. Adding this workflow must not create another PostgreSQL instance, in-cluster PostgreSQL StatefulSet, or ad hoc database namespace.
- `platform-infra-wechat-archive` and future similar public workflow CLIs should reuse the common platform-infra operations library for YAML parsing, target selection, workflow sync, private microservice proxy calls, transfer polling, staging path mapping, redaction and bounded output. Service-specific modules should keep only their business mapping and workflow payload rendering.
- Closeout requires `platform-infra wechat-archive apply --confirm --wait`, `platform-infra wechat-archive status`, `platform-infra wechat-archive validate --full`, and a `platform-infra wechat-archive pull` command that retrieves an uploaded file by remote path or `fsId` and reports local path plus hash.
+29 -2
View File
@@ -1014,6 +1014,29 @@ function prepareSecretMaterial(langbot: LangBotConfig): SecretMaterial {
};
}
export function readLangBotRuntimeConfig(): Record<string, unknown> {
const langbot = readLangBotConfig();
const target = resolveTarget(langbot, "G14");
return {
publicBaseUrl: target.publicExposure.publicBaseUrl,
expectedNamespace: target.namespace,
apiKeyName: langbot.apiKey.key,
valuesPrinted: false,
};
}
export function readLangBotSecretMaterial(): Record<string, unknown> {
const langbot = readLangBotConfig();
const secret = prepareSecretMaterial(langbot);
return {
apiKey: secret.values.apiKey,
apiKeyFingerprint: fingerprintValues({ [langbot.apiKey.key]: secret.values.apiKey }, [langbot.apiKey.key]),
sourceRef: langbot.apiKey.sourceRef,
keyName: langbot.apiKey.key,
valuesPrinted: false,
};
}
function prepareFrpcSecret(langbot: LangBotConfig, target: LangBotTarget): FrpcSecretMaterial {
const exposure = target.publicExposure;
const sourcePath = join(secretRoot(langbot), exposure.frpc.tokenSourceRef);
@@ -1556,7 +1579,7 @@ function publicHttpProbe(baseUrl: string, path: string, apiKey: string | null):
url,
status: Number.isInteger(status) ? status : null,
bodyBytes: Buffer.byteLength(body, "utf8"),
bodyPreview: body.slice(0, 2000),
bodyPreview: redactText(body).slice(0, 2000),
stderrTail: redactText(stderr).slice(-2000),
apiKeyUsed: apiKey !== null,
valuesPrinted: false,
@@ -1708,7 +1731,11 @@ function compactCapture(result: SshCaptureResult, options: { full?: boolean } =
}
function redactText(text: string): string {
return text.replace(/lbk_[A-Za-z0-9_-]+/gu, "lbk_<redacted>").replace(/postgresql:\/\/[^@\s]+@/gu, "postgresql://<redacted>@");
return text
.replace(/lbk_[A-Za-z0-9_-]+/gu, "lbk_<redacted>")
.replace(/(postgres(?:ql)?:\/\/)[^@\s"']+@/giu, "$1<redacted>@")
.replace(/(Bearer\s+)[A-Za-z0-9._~+/=-]+/giu, "$1<redacted>")
.replace(/(["']?(?:token|password|secret|api[_-]?key|apikey|jwt[_-]?secret|database[_-]?url)["']?\s*[:=]\s*["']?)[^"',\s}]+(["']?)/giu, "$1<redacted>$2");
}
function redactRepoPath(path: string): string {
+422 -76
View File
@@ -1,5 +1,5 @@
import { createHash, randomUUID } from "node:crypto";
import { existsSync } from "node:fs";
import { randomUUID } from "node:crypto";
import { existsSync, readFileSync } from "node:fs";
import pathPosix from "node:path/posix";
import type { UniDeskConfig } from "./config";
import { rootPath } from "./config";
@@ -11,7 +11,6 @@ import {
compactProxyResponse,
compactUnknown,
containerPathToHostPath,
dateInTimeZone,
fetchJsonWithTimeout,
findBaiduFileByRemotePath,
hostRootPath,
@@ -24,8 +23,6 @@ import {
readYamlRecord,
redactSensitiveUnknown,
recordField,
relativeStagingPath,
renderTemplate,
repoRelative,
resolveRepoPath,
sanitizePathSegment,
@@ -33,11 +30,11 @@ import {
stringField,
syncN8nWorkflow,
waitForBaiduTransfer,
writeStagingBase64,
writeStagingText,
type OpsApplyOptions,
type OpsCommonOptions,
} from "./platform-infra-ops-library";
import { readLangBotRuntimeConfig, readLangBotSecretMaterial } from "./platform-infra-langbot";
import { fingerprintValues } from "./platform-infra-public-service";
const configFile = rootPath("config", "platform-infra", "wechat-archive.yaml");
const configLabel = "config/platform-infra/wechat-archive.yaml";
@@ -47,12 +44,26 @@ interface WechatArchiveConfig {
kind: "platform-infra-wechat-archive";
metadata: { id: string; owner: string; relatedIssues: number[] };
target: { id: string; route: string; namespace: string };
langbot: { configRef: string; publicBaseUrl: string; expectedAdapter: string; callbackPath: string; notes: string[] };
langbot: {
configRef: string;
publicBaseUrl: string;
expectedAdapter: string;
callbackPath: string;
pipeline: { name: string; description: string; runner: string; outputKey: string; timeoutSeconds: number };
notes: string[];
};
n8n: {
configRef: string;
publicBaseUrl: string;
workflow: { name: string; id: string; webhookPath: string; active: boolean; timezone: string };
};
archiveCallback: {
publicUrl: string;
secretRoot: string;
tokenSourceRef: string;
tokenKey: string;
timeoutMs: number;
};
baiduNetdisk: {
serviceId: string;
proxyMode: string;
@@ -132,7 +143,7 @@ function parsePullOptions(args: string[]): PullOptions {
function plan(options: OpsCommonOptions): Record<string, unknown> {
const archive = readConfig();
assertTarget(archive, options.targetId);
const workflow = renderN8nWorkflow(archive);
const workflow = renderN8nWorkflow(archive, null);
const policy = policyChecks(archive);
return {
ok: policy.every((check) => check.ok),
@@ -176,7 +187,8 @@ async function apply(config: UniDeskConfig, options: OpsApplyOptions): Promise<R
statusCommand: `bun scripts/cli.ts job status ${job.id} --tail-bytes 12000`,
};
}
const workflowJson = renderN8nWorkflow(archive);
const callbackToken = options.dryRun ? null : readArchiveCallbackToken(archive);
const workflowJson = renderN8nWorkflow(archive, callbackToken?.value ?? null);
const sync = await syncN8nWorkflow(config, {
targetRoute: archive.target.route,
namespace: archive.target.namespace,
@@ -188,14 +200,22 @@ async function apply(config: UniDeskConfig, options: OpsApplyOptions): Promise<R
active: archive.n8n.workflow.active,
dryRun: options.dryRun,
});
const langbotBinding = options.dryRun ? langBotDryRun(archive) : await syncLangBotPipeline(archive);
return {
ok: sync.ok,
ok: sync.ok && langbotBinding.ok === true,
action: "platform-infra-wechat-archive-apply",
mode: options.dryRun ? "dry-run" : "confirmed",
mutation: !options.dryRun,
config: configSummary(archive),
policy,
n8nWorkflow: sync,
archiveCallbackAuth: callbackToken === null ? { mode: "dry-run", valuesPrinted: false } : {
sourceRef: callbackToken.sourceRef,
keyName: callbackToken.keyName,
fingerprint: callbackToken.fingerprint,
valuesPrinted: false,
},
langbotBinding,
next: {
status: `bun scripts/cli.ts platform-infra wechat-archive status --target ${archive.target.id}`,
validate: `bun scripts/cli.ts platform-infra wechat-archive validate --target ${archive.target.id} --full`,
@@ -209,10 +229,12 @@ async function status(options: OpsCommonOptions): Promise<Record<string, unknown
const health = microserviceProxy(archive.baiduNetdisk.serviceId, "/health", { timeoutMs: 30_000 });
const auth = microserviceProxy(archive.baiduNetdisk.serviceId, "/api/auth/status", { timeoutMs: 30_000 });
const transfers = microserviceProxy(archive.baiduNetdisk.serviceId, "/api/transfers?limit=10", { timeoutMs: 30_000 });
const langbot = await inspectLangBotBinding(archive);
return {
ok: health.ok && auth.ok,
ok: health.ok && auth.ok && langbot.ok === true,
action: "platform-infra-wechat-archive-status",
config: configSummary(archive),
langbot,
n8n: {
webhookUrl: webhookUrl(archive),
workflowId: archive.n8n.workflow.id,
@@ -236,15 +258,17 @@ async function validate(options: OpsCommonOptions): Promise<Record<string, unkno
const imagePayload = fixturePayload(archive, "image", observedAt);
const n8nText = await callWorkflow(archive, textPayload);
const n8nImage = await callWorkflow(archive, imagePayload);
const textArchive = await archiveMessage(archive, textPayload, n8nText);
const imageArchive = await archiveMessage(archive, imagePayload, n8nImage);
const textPull = await pullByFsId(archive, String(textArchive.fsId || ""), pullLocalRel(archive, textArchive.remotePath));
const imagePull = await pullByFsId(archive, String(imageArchive.fsId || ""), pullLocalRel(archive, imageArchive.remotePath));
const ok = n8nText.ok === true && n8nImage.ok === true && textArchive.ok === true && imageArchive.ok === true && textPull.ok === true && imagePull.ok === true;
const langbot = await inspectLangBotBinding(archive);
const textArchive = archiveFromWorkflowResponse(textPayload, n8nText);
const imageArchive = archiveFromWorkflowResponse(imagePayload, n8nImage);
const textPull = await pullArchiveIfReady(archive, textArchive);
const imagePull = await pullArchiveIfReady(archive, imageArchive);
const ok = langbot.ok === true && n8nText.ok === true && n8nImage.ok === true && textArchive.ok === true && imageArchive.ok === true && textPull.ok === true && imagePull.ok === true;
return {
ok,
action: "platform-infra-wechat-archive-validate",
config: configSummary(archive),
langbot,
fixtures: {
text: validationSummary(textPayload, n8nText, textArchive, textPull, options.full),
image: validationSummary(imagePayload, n8nImage, imageArchive, imagePull, options.full),
@@ -292,6 +316,7 @@ function readConfig(): WechatArchiveConfig {
const langbot = recordField(root, "langbot", configLabel);
const n8n = recordField(root, "n8n", configLabel);
const n8nWorkflow = recordField(n8n, "workflow", `${configLabel}.n8n`);
const archiveCallback = recordField(root, "archiveCallback", configLabel);
const baiduNetdisk = recordField(root, "baiduNetdisk", configLabel);
const staging = recordField(baiduNetdisk, "staging", `${configLabel}.baiduNetdisk`);
const archive = recordField(baiduNetdisk, "archive", `${configLabel}.baiduNetdisk`);
@@ -332,6 +357,7 @@ function readConfig(): WechatArchiveConfig {
publicBaseUrl: stringField(langbot, "publicBaseUrl", configLabel),
expectedAdapter: stringField(langbot, "expectedAdapter", configLabel),
callbackPath: stringField(langbot, "callbackPath", configLabel),
pipeline: parseLangBotPipeline(recordField(langbot, "pipeline", `${configLabel}.langbot`)),
notes: Array.isArray(langbot.notes) ? langbot.notes.map(String) : [],
},
n8n: {
@@ -345,6 +371,13 @@ function readConfig(): WechatArchiveConfig {
timezone: stringField(n8nWorkflow, "timezone", `${configLabel}.n8n.workflow`),
},
},
archiveCallback: {
publicUrl: stringField(archiveCallback, "publicUrl", `${configLabel}.archiveCallback`),
secretRoot: stringField(archiveCallback, "secretRoot", `${configLabel}.archiveCallback`),
tokenSourceRef: stringField(archiveCallback, "tokenSourceRef", `${configLabel}.archiveCallback`),
tokenKey: stringField(archiveCallback, "tokenKey", `${configLabel}.archiveCallback`),
timeoutMs: numberField(archiveCallback, "timeoutMs", `${configLabel}.archiveCallback`),
},
baiduNetdisk: {
serviceId: stringField(baiduNetdisk, "serviceId", `${configLabel}.baiduNetdisk`),
proxyMode: stringField(baiduNetdisk, "proxyMode", `${configLabel}.baiduNetdisk`),
@@ -397,6 +430,8 @@ function validateConfig(config: WechatArchiveConfig): void {
}
if (!config.n8n.publicBaseUrl.startsWith("https://")) throw new Error(`${configLabel}.n8n.publicBaseUrl must be https`);
if (!config.langbot.publicBaseUrl.startsWith("https://")) throw new Error(`${configLabel}.langbot.publicBaseUrl must be https`);
if (!/^https?:\/\//u.test(config.archiveCallback.publicUrl)) throw new Error(`${configLabel}.archiveCallback.publicUrl must be http or https`);
if (!/^[A-Za-z_][A-Za-z0-9_]*$/u.test(config.archiveCallback.tokenKey)) throw new Error(`${configLabel}.archiveCallback.tokenKey must be an env key`);
if (!config.n8n.workflow.webhookPath || config.n8n.workflow.webhookPath.startsWith("/") || config.n8n.workflow.webhookPath.includes("/")) {
throw new Error(`${configLabel}.n8n.workflow.webhookPath must be one relative path segment`);
}
@@ -419,6 +454,7 @@ function configSummary(config: WechatArchiveConfig): Record<string, unknown> {
publicBaseUrl: config.langbot.publicBaseUrl,
expectedAdapter: config.langbot.expectedAdapter,
callbackPath: config.langbot.callbackPath,
pipeline: config.langbot.pipeline,
},
n8n: {
configRef: config.n8n.configRef,
@@ -426,6 +462,14 @@ function configSummary(config: WechatArchiveConfig): Record<string, unknown> {
workflow: config.n8n.workflow,
webhookUrl: webhookUrl(config),
},
archiveCallback: {
publicUrl: config.archiveCallback.publicUrl,
secretRoot: config.archiveCallback.secretRoot,
tokenSourceRef: config.archiveCallback.tokenSourceRef,
tokenKey: config.archiveCallback.tokenKey,
timeoutMs: config.archiveCallback.timeoutMs,
valuesPrinted: false,
},
baiduNetdisk: {
serviceId: config.baiduNetdisk.serviceId,
proxyMode: config.baiduNetdisk.proxyMode,
@@ -443,6 +487,7 @@ function policyChecks(config: WechatArchiveConfig): Array<Record<string, unknown
{ name: "langbot-yaml-ref", ok: config.langbot.configRef === "config/platform-infra/langbot.yaml", detail: "LangBot base service remains declared in YAML." },
{ name: "n8n-yaml-ref", ok: config.n8n.configRef === "config/platform-infra/n8n.yaml", detail: "n8n base service remains declared in YAML." },
{ name: "baidu-private-proxy", ok: config.baiduNetdisk.proxyMode === "backend-core-microservice-proxy", detail: "Baidu Netdisk access stays behind backend-core microservice proxy." },
{ name: "callback-token-source-ref", ok: Boolean(config.archiveCallback.tokenSourceRef && config.archiveCallback.tokenKey), detail: "n8n archive callback token is read from a YAML-declared local sourceRef and never reverse-engineered from runtime." },
{ name: "single-pk01-postgres", ok: true, detail: "LangBot and n8n use dedicated databases inside the single PK01/Pika01 external PostgreSQL instance; this workflow adds no PostgreSQL instance." },
{ name: "text-enabled", ok: config.messageTypes.text?.enabled === true, detail: "Text messages are archive-enabled." },
{ name: "image-enabled", ok: config.messageTypes.image?.enabled === true, detail: "Image messages are archive-enabled." },
@@ -450,11 +495,230 @@ function policyChecks(config: WechatArchiveConfig): Array<Record<string, unknown
];
}
function parseLangBotPipeline(raw: Record<string, unknown>): WechatArchiveConfig["langbot"]["pipeline"] {
const timeoutSeconds = numberField(raw, "timeoutSeconds", `${configLabel}.langbot.pipeline`);
return {
name: stringField(raw, "name", `${configLabel}.langbot.pipeline`),
description: stringField(raw, "description", `${configLabel}.langbot.pipeline`),
runner: stringField(raw, "runner", `${configLabel}.langbot.pipeline`),
outputKey: stringField(raw, "outputKey", `${configLabel}.langbot.pipeline`),
timeoutSeconds,
};
}
function webhookUrl(config: WechatArchiveConfig): string {
return `${config.n8n.publicBaseUrl.replace(/\/+$/u, "")}/webhook/${registeredWebhookPath(config)}`;
}
function renderN8nWorkflow(config: WechatArchiveConfig): Record<string, unknown> {
function langBotDryRun(config: WechatArchiveConfig): Record<string, unknown> {
return {
ok: true,
mode: "dry-run",
pipeline: {
name: config.langbot.pipeline.name,
runner: config.langbot.pipeline.runner,
webhookUrl: webhookUrl(config),
outputKey: config.langbot.pipeline.outputKey,
timeoutSeconds: config.langbot.pipeline.timeoutSeconds,
},
bot: {
expectedAdapter: config.langbot.expectedAdapter,
desiredUsePipelineName: config.langbot.pipeline.name,
},
valuesPrinted: false,
};
}
async function syncLangBotPipeline(config: WechatArchiveConfig): Promise<Record<string, unknown>> {
const runtime = readLangBotRuntimeConfig();
const secret = readLangBotSecretMaterial();
const apiKey = String(secret.apiKey || "");
const baseUrl = String(runtime.publicBaseUrl || config.langbot.publicBaseUrl);
const pipelinesResp = await langBotRequest(baseUrl, apiKey, "GET", "/api/v1/pipelines");
const pipelines = arrayFromPath(pipelinesResp.body, ["data", "pipelines"]);
let pipeline = pipelines.find((item) => String(asRecord(item, "pipeline").name || "") === config.langbot.pipeline.name);
let created = false;
if (pipeline === undefined) {
const createdResp = await langBotRequest(baseUrl, apiKey, "POST", "/api/v1/pipelines", {
name: config.langbot.pipeline.name,
description: config.langbot.pipeline.description,
emoji: "archive",
});
const uuid = String(nestedValue(createdResp.body, ["data", "uuid"]) || "");
if (!uuid) throw new Error("LangBot create pipeline did not return uuid");
const getResp = await langBotRequest(baseUrl, apiKey, "GET", `/api/v1/pipelines/${encodeURIComponent(uuid)}`);
pipeline = asRecord(nestedValue(getResp.body, ["data", "pipeline"]), "created pipeline");
created = true;
}
const pipelineRecord = asRecord(pipeline, "pipeline");
const pipelineUuid = String(pipelineRecord.uuid || "");
if (!pipelineUuid) throw new Error("LangBot pipeline record is missing uuid");
const configRecord = asRecord(pipelineRecord.config, "pipeline.config");
const nextConfig = langBotPipelineConfig(configRecord, config);
await langBotRequest(baseUrl, apiKey, "PUT", `/api/v1/pipelines/${encodeURIComponent(pipelineUuid)}`, {
name: config.langbot.pipeline.name,
description: config.langbot.pipeline.description,
config: nextConfig,
});
const botsResp = await langBotRequest(baseUrl, apiKey, "GET", "/api/v1/platform/bots");
const bots = arrayFromPath(botsResp.body, ["data", "bots"]);
const bot = bots.find((item) => String(asRecord(item, "bot").adapter || "") === config.langbot.expectedAdapter);
if (bot === undefined) throw new Error(`LangBot bot adapter ${config.langbot.expectedAdapter} not found`);
const botRecord = asRecord(bot, "bot");
const botUuid = String(botRecord.uuid || "");
if (!botUuid) throw new Error("LangBot bot record is missing uuid");
const alreadyBound = String(botRecord.use_pipeline_uuid || "") === pipelineUuid;
if (!alreadyBound) {
await langBotRequest(baseUrl, apiKey, "PUT", `/api/v1/platform/bots/${encodeURIComponent(botUuid)}`, { use_pipeline_uuid: pipelineUuid });
}
return {
ok: true,
mode: "confirmed",
pipeline: {
uuid: pipelineUuid,
name: config.langbot.pipeline.name,
runner: config.langbot.pipeline.runner,
webhookUrl: webhookUrl(config),
created,
},
bot: {
uuid: botUuid,
adapter: config.langbot.expectedAdapter,
bound: true,
wasAlreadyBound: alreadyBound,
usePipelineUuid: pipelineUuid,
usePipelineName: config.langbot.pipeline.name,
},
auth: {
sourceRef: secret.sourceRef,
keyName: secret.keyName,
apiKeyFingerprint: secret.apiKeyFingerprint,
valuesPrinted: false,
},
valuesPrinted: false,
};
}
async function inspectLangBotBinding(config: WechatArchiveConfig): Promise<Record<string, unknown>> {
try {
const runtime = readLangBotRuntimeConfig();
const secret = readLangBotSecretMaterial();
const apiKey = String(secret.apiKey || "");
const baseUrl = String(runtime.publicBaseUrl || config.langbot.publicBaseUrl);
const pipelinesResp = await langBotRequest(baseUrl, apiKey, "GET", "/api/v1/pipelines");
const pipelines = arrayFromPath(pipelinesResp.body, ["data", "pipelines"]);
const pipeline = pipelines.find((item) => String(asRecord(item, "pipeline").name || "") === config.langbot.pipeline.name);
const pipelineRecord = pipeline === undefined ? null : asRecord(pipeline, "pipeline");
const pipelineUuid = String(pipelineRecord?.uuid || "");
const pipelineConfig = typeof pipelineRecord?.config === "object" && pipelineRecord.config !== null && !Array.isArray(pipelineRecord.config)
? pipelineRecord.config as Record<string, unknown>
: {};
const runner = String(nestedValue(pipelineConfig, ["ai", "runner", "runner"]) || "");
const webhook = String(nestedValue(pipelineConfig, ["ai", "n8n-service-api", "webhook-url"]) || "");
const botsResp = await langBotRequest(baseUrl, apiKey, "GET", "/api/v1/platform/bots");
const bots = arrayFromPath(botsResp.body, ["data", "bots"]);
const bot = bots.find((item) => String(asRecord(item, "bot").adapter || "") === config.langbot.expectedAdapter);
const botRecord = bot === undefined ? null : asRecord(bot, "bot");
const botPipelineUuid = String(botRecord?.use_pipeline_uuid || "");
const ok = Boolean(pipelineUuid)
&& runner === config.langbot.pipeline.runner
&& webhook === webhookUrl(config)
&& botPipelineUuid === pipelineUuid;
return {
ok,
pipeline: {
exists: Boolean(pipelineUuid),
uuid: pipelineUuid || null,
name: config.langbot.pipeline.name,
runner,
webhookMatches: webhook === webhookUrl(config),
},
bot: {
exists: botRecord !== null,
uuid: String(botRecord?.uuid || "") || null,
adapter: config.langbot.expectedAdapter,
usePipelineUuid: botPipelineUuid || null,
bound: Boolean(pipelineUuid) && botPipelineUuid === pipelineUuid,
},
auth: {
sourceRef: secret.sourceRef,
keyName: secret.keyName,
apiKeyFingerprint: secret.apiKeyFingerprint,
valuesPrinted: false,
},
valuesPrinted: false,
};
} catch (error) {
return {
ok: false,
error: error instanceof Error ? error.message : String(error),
valuesPrinted: false,
};
}
}
function langBotPipelineConfig(current: Record<string, unknown>, archive: WechatArchiveConfig): Record<string, unknown> {
const next = JSON.parse(JSON.stringify(current)) as Record<string, unknown>;
const ai = asMutableRecord(next, "ai");
const runner = asMutableRecord(ai, "runner");
runner.runner = archive.langbot.pipeline.runner;
runner["expire-time"] = 0;
const n8n = asMutableRecord(ai, "n8n-service-api");
n8n["webhook-url"] = webhookUrl(archive);
n8n["auth-type"] = "none";
n8n.timeout = archive.langbot.pipeline.timeoutSeconds;
n8n["output-key"] = archive.langbot.pipeline.outputKey;
return next;
}
async function langBotRequest(baseUrl: string, apiKey: string, method: string, apiPath: string, body?: unknown): Promise<Record<string, unknown>> {
const controller = new AbortController();
const timer = setTimeout(() => controller.abort(), 45_000);
try {
const response = await fetch(`${baseUrl.replace(/\/+$/u, "")}${apiPath}`, {
method,
headers: { "content-type": "application/json", "x-api-key": apiKey },
body: body === undefined ? undefined : JSON.stringify(body),
signal: controller.signal,
});
const text = await response.text();
let parsed: unknown = text;
try {
parsed = text.length > 0 ? JSON.parse(text) as unknown : null;
} catch {
parsed = text;
}
if (!response.ok) throw new Error(`LangBot ${method} ${apiPath} failed with HTTP ${response.status}: ${JSON.stringify(compactUnknown(parsed)).slice(0, 800)}`);
return { ok: true, status: response.status, body: parsed };
} finally {
clearTimeout(timer);
}
}
function nestedValue(value: unknown, path: string[]): unknown {
let current = value;
for (const part of path) {
if (typeof current !== "object" || current === null || Array.isArray(current)) return undefined;
current = (current as Record<string, unknown>)[part];
}
return current;
}
function arrayFromPath(value: unknown, path: string[]): unknown[] {
const nested = nestedValue(value, path);
return Array.isArray(nested) ? nested : [];
}
function asMutableRecord(parent: Record<string, unknown>, key: string): Record<string, unknown> {
const current = parent[key];
if (typeof current === "object" && current !== null && !Array.isArray(current)) return current as Record<string, unknown>;
const next: Record<string, unknown> = {};
parent[key] = next;
return next;
}
function renderN8nWorkflow(config: WechatArchiveConfig, callbackToken: string | null): Record<string, unknown> {
const tokenForWorkflow = callbackToken ?? "__UNIDESK_WECHAT_ARCHIVE_TOKEN_FROM_SOURCE_REF__";
const code = `
const input = $input.first()?.json ?? {};
const body = input.body && typeof input.body === 'object' ? input.body : input;
@@ -465,11 +729,57 @@ function pick(...keys) {
}
return '';
}
function sanitizePathSegment(value, fallback = 'unknown') {
const normalized = String(value || fallback)
.normalize('NFKC')
.replace(/[\\\\/\\0\\r\\n\\t]+/g, '-')
.replace(/[^A-Za-z0-9._-]+/g, '-')
.replace(/^-+|-+$/g, '')
.slice(0, 120);
return normalized.length > 0 ? normalized : fallback;
}
function dateInTimeZone(date, timeZone) {
const parts = new Intl.DateTimeFormat('en-CA', {
timeZone,
year: 'numeric',
month: '2-digit',
day: '2-digit',
}).formatToParts(date).reduce((acc, part) => {
acc[part.type] = part.value;
return acc;
}, {});
return [parts.year, parts.month, parts.day].join('-');
}
function renderTemplate(template, values) {
return template.replace(/\\{\\{\\s*([A-Za-z0-9_]+)\\s*\\}\\}/g, (_match, key) => values[key] || '');
}
function normalizeRemotePath(path) {
const collapsed = String(path).replace(/\\/+/g, '/');
return collapsed.startsWith('/') ? collapsed : '/' + collapsed;
}
const messageType = String(pick('messageType', 'type', 'msgType') || 'text').toLowerCase();
const receivedAt = String(pick('receivedAt', 'timestamp') || new Date().toISOString());
const messageId = String(pick('messageId', 'msgId', 'id') || String(Date.now()));
const conversationId = String(pick('conversationId', 'chatId', 'roomId', 'fromUser') || 'unknown-conversation');
const senderId = String(pick('senderId', 'userId', 'fromUserName', 'fromUser') || 'unknown-sender');
const receivedDate = Number.isNaN(new Date(receivedAt).getTime()) ? new Date() : new Date(receivedAt);
const messageId = sanitizePathSegment(pick('messageId', 'msgId', 'id', 'message_id') || String(Date.now()), 'message');
const conversationId = sanitizePathSegment(pick('conversationId', 'chatId', 'roomId', 'fromUser', 'conversation_id', 'session_id') || 'unknown-conversation', 'conversation');
const senderId = sanitizePathSegment(pick('senderId', 'userId', 'fromUserName', 'fromUser', 'user_id') || 'unknown-sender', 'sender');
const media = body.media && typeof body.media === 'object' ? body.media : {};
const extension = messageType === 'image' ? '${config.baiduNetdisk.archive.imageExtension}' : '${config.baiduNetdisk.archive.textExtension}';
const filename = messageType === 'image'
? sanitizePathSegment(media.filename || messageId + '.' + extension, messageId + '.' + extension)
: messageId + '.' + extension;
const text = String(pick('text', 'content', 'message', 'chatInput', 'query', 'query_text') || '');
const remotePath = normalizeRemotePath(renderTemplate('${config.baiduNetdisk.archive.pathTemplate}', {
remoteRoot: '${config.baiduNetdisk.archive.remoteRoot}',
date: dateInTimeZone(receivedDate, '${config.baiduNetdisk.archive.timezone}'),
timestamp: receivedDate.toISOString().replace(/[-:.TZ]/g, ''),
conversationId,
senderId,
messageId,
type: messageType,
extension,
filename,
}));
const normalized = {
ok: true,
source: 'n8n',
@@ -479,13 +789,15 @@ const normalized = {
conversationId,
senderId,
messageType,
text: String(pick('text', 'content') || ''),
text,
receivedAt,
media: body.media && typeof body.media === 'object' ? body.media : {},
media,
},
archive: {
remoteRoot: '${config.baiduNetdisk.archive.remoteRoot}',
pathTemplate: '${config.baiduNetdisk.archive.pathTemplate}',
remotePath,
filename,
},
};
return [{ json: normalized }];
@@ -516,11 +828,38 @@ return [{ json: normalized }];
typeVersion: 2,
position: [280, 0],
},
{
parameters: {
method: "POST",
url: config.archiveCallback.publicUrl,
sendHeaders: true,
headerParameters: {
parameters: [
{ name: "Authorization", value: `Bearer ${tokenForWorkflow}` },
{ name: "Content-Type", value: "application/json" },
],
},
sendBody: true,
specifyBody: "json",
jsonBody: "={{ JSON.stringify($json) }}",
options: {
timeout: config.archiveCallback.timeoutMs,
},
},
id: "archive-callback",
name: "Archive To Baidu Callback",
type: "n8n-nodes-base.httpRequest",
typeVersion: 4.2,
position: [560, 0],
},
],
connections: {
"wechat-archive-webhook": {
main: [[{ node: "Normalize WeChat Message", type: "main", index: 0 }]],
},
"Normalize WeChat Message": {
main: [[{ node: "Archive To Baidu Callback", type: "main", index: 0 }]],
},
},
settings: {
executionOrder: "v1",
@@ -570,7 +909,7 @@ function fixturePayload(config: WechatArchiveConfig, type: "text" | "image", obs
}
async function callWorkflow(config: WechatArchiveConfig, payload: Record<string, unknown>): Promise<Record<string, unknown>> {
const response = await fetchJsonWithTimeout(webhookUrl(config), payload, 45_000);
const response = await fetchJsonWithTimeout(webhookUrl(config), payload, Math.max(45_000, config.archiveCallback.timeoutMs + 30_000));
const body = typeof response.body === "object" && response.body !== null && !Array.isArray(response.body)
? response.body as Record<string, unknown>
: {};
@@ -582,7 +921,7 @@ async function callWorkflow(config: WechatArchiveConfig, payload: Record<string,
};
}
async function archiveMessage(config: WechatArchiveConfig, originalPayload: Record<string, unknown>, n8nResponse: Record<string, unknown>): Promise<Record<string, unknown>> {
function archiveFromWorkflowResponse(originalPayload: Record<string, unknown>, n8nResponse: Record<string, unknown>): Record<string, unknown> {
if (n8nResponse.ok !== true) {
return {
ok: false,
@@ -594,59 +933,22 @@ async function archiveMessage(config: WechatArchiveConfig, originalPayload: Reco
};
}
const body = asRecord(n8nResponse.body ?? {}, "n8n response body");
const archive = asRecord(body.archive ?? {}, "n8n response archive");
const normalized = asRecord(body.message ?? originalPayload, "normalized message");
const type = String(normalized.messageType ?? originalPayload.messageType ?? "text").toLowerCase();
const messageId = sanitizePathSegment(String(normalized.messageId ?? originalPayload.messageId ?? randomUUID()));
const conversationId = sanitizePathSegment(String(normalized.conversationId ?? originalPayload.conversationId ?? "unknown-conversation"));
const senderId = sanitizePathSegment(String(normalized.senderId ?? originalPayload.senderId ?? "unknown-sender"));
const receivedAt = new Date(String(normalized.receivedAt ?? originalPayload.receivedAt ?? new Date().toISOString()));
const extension = type === "image" ? config.baiduNetdisk.archive.imageExtension : config.baiduNetdisk.archive.textExtension;
const remotePath = normalizeRemotePath(renderTemplate(config.baiduNetdisk.archive.pathTemplate, {
remoteRoot: config.baiduNetdisk.archive.remoteRoot,
date: dateInTimeZone(receivedAt, config.baiduNetdisk.archive.timezone),
timestamp: receivedAt.toISOString().replace(/[-:.TZ]/gu, ""),
conversationId,
senderId,
messageId,
type,
extension,
filename: `${messageId}.${extension}`,
}));
const localRel = relativeStagingPath(config.baiduNetdisk.staging.outboxDir, `${messageId}.${extension}`);
const written = type === "image"
? writeStagingBase64({ hostRoot: config.baiduNetdisk.staging.hostRoot, relativePath: localRel, dataBase64: String(asRecord(normalized.media ?? originalPayload.media ?? {}, "image media").dataBase64 ?? "") })
: writeStagingText({ hostRoot: config.baiduNetdisk.staging.hostRoot, relativePath: localRel, content: messageText(normalized, originalPayload) });
const created = assertProxyOk(microserviceProxy(config.baiduNetdisk.serviceId, "/api/transfers/upload-from-path", {
method: "POST",
body: { localPath: localRel, remotePath },
timeoutMs: 60_000,
}), "baidu upload create");
const job = asRecord(created.job, "baidu upload job");
const jobId = String(job.id || "");
const waited = await waitForBaiduTransfer(config.baiduNetdisk.serviceId, jobId, {
timeoutMs: config.validate.timeoutMs,
pollIntervalMs: config.validate.pollIntervalMs,
});
const waitedJob = asRecord(waited.job ?? {}, "baidu waited upload job");
const result = asRecord(waitedJob.result ?? {}, "baidu upload result");
const baiduResult = asRecord(result.baidu ?? {}, "baidu upload result.baidu");
let fsId = String(baiduResult.fs_id ?? baiduResult.fsId ?? waitedJob.fsId ?? "");
let listed: Record<string, unknown> | null = null;
if (!fsId) {
listed = findBaiduFileByRemotePath(config.baiduNetdisk.serviceId, remotePath);
fsId = String(listed?.fsId ?? listed?.fs_id ?? "");
}
const remotePath = String(archive.remotePath || "");
const fsId = String(archive.fsId || archive.fs_id || "");
return {
ok: waited.ok === true && Boolean(fsId),
ok: body.ok === true && Boolean(remotePath) && Boolean(fsId),
type,
remotePath,
fsId,
messageId,
conversationId,
senderId,
local: { relativePath: localRel, hostPath: written.hostPath, bytes: written.bytes, sha256: written.sha256 },
uploadJob: waitedJob,
listed: redactBaiduFileEntry(listed),
messageId: normalized.messageId ?? originalPayload.messageId,
conversationId: normalized.conversationId ?? originalPayload.conversationId,
senderId: normalized.senderId ?? originalPayload.senderId,
local: typeof archive.local === "object" && archive.local !== null && !Array.isArray(archive.local) ? archive.local : {},
uploadJob: typeof archive.uploadJob === "object" && archive.uploadJob !== null && !Array.isArray(archive.uploadJob) ? archive.uploadJob : {},
listed: typeof archive.listed === "object" && archive.listed !== null && !Array.isArray(archive.listed) ? redactBaiduFileEntry(archive.listed as Record<string, unknown>) : null,
};
}
@@ -678,10 +980,19 @@ async function pullByFsId(config: WechatArchiveConfig, fsId: string, localRel: s
};
}
function messageText(normalized: Record<string, unknown>, originalPayload: Record<string, unknown>): string {
const text = String(normalized.text ?? originalPayload.text ?? originalPayload.content ?? "");
const payloadHash = createHash("sha256").update(JSON.stringify(originalPayload)).digest("hex");
return `${text}\n\n---\nsource=unidesk-wechat-archive\npayloadSha256=${payloadHash}\n`;
async function pullArchiveIfReady(config: WechatArchiveConfig, archive: Record<string, unknown>): Promise<Record<string, unknown>> {
const fsId = String(archive.fsId || "");
const remotePath = String(archive.remotePath || "");
if (archive.ok !== true || !fsId || !remotePath) {
return {
ok: false,
skipped: true,
reason: "archive-upload-not-ready",
fsId,
remotePath,
};
}
return await pullByFsId(config, fsId, pullLocalRel(config, remotePath));
}
function pullLocalRel(config: WechatArchiveConfig, remotePath: unknown): string {
@@ -727,3 +1038,38 @@ function redactBaiduFileEntry(value: Record<string, unknown> | null): Record<str
if (copy.dlink !== undefined) copy.dlink = "<redacted-download-link>";
return copy;
}
function readArchiveCallbackToken(config: WechatArchiveConfig): { sourceRef: string; keyName: string; value: string; fingerprint: string } {
const sourcePath = pathPosix.join(config.archiveCallback.secretRoot, config.archiveCallback.tokenSourceRef);
if (!existsSync(sourcePath)) {
throw new Error(`${config.archiveCallback.tokenSourceRef} is missing; run bun scripts/cli.ts secrets sync --config config/secrets-distribution.yaml --scope platform-infra --confirm first`);
}
const values = parseEnvFile(readFileSync(sourcePath, "utf8"));
const value = values[config.archiveCallback.tokenKey] ?? "";
if (value.length === 0) throw new Error(`${config.archiveCallback.tokenSourceRef} is missing required key ${config.archiveCallback.tokenKey}`);
return {
sourceRef: config.archiveCallback.tokenSourceRef,
keyName: config.archiveCallback.tokenKey,
value,
fingerprint: fingerprintValues({ [config.archiveCallback.tokenKey]: value }, [config.archiveCallback.tokenKey]),
};
}
function parseEnvFile(text: string): Record<string, string> {
const result: Record<string, string> = {};
for (const rawLine of text.split(/\r?\n/u)) {
const line = rawLine.trim();
if (line.length === 0 || line.startsWith("#")) continue;
const eq = line.indexOf("=");
if (eq <= 0) continue;
const key = line.slice(0, eq).trim();
if (!/^[A-Za-z_][A-Za-z0-9_]*$/u.test(key)) continue;
result[key] = unquoteEnvValue(line.slice(eq + 1).trim());
}
return result;
}
function unquoteEnvValue(value: string): string {
if ((value.startsWith("'") && value.endsWith("'")) || (value.startsWith("\"") && value.endsWith("\""))) return value.slice(1, -1);
return value;
}
+142
View File
@@ -15,6 +15,7 @@ interface RuntimeConfig {
providerToken: string | null;
sshClientToken: string | null;
sshClientRouteAllowlist: string[];
wechatArchiveToken: string | null;
sessionSecret: string;
sessionTtlSeconds: number;
logFile: string;
@@ -211,6 +212,8 @@ function readConfig(): RuntimeConfig {
sshClientToken: optionalEnv("UNIDESK_SSH_CLIENT_TOKEN")
?? optionalFileEnv("UNIDESK_SSH_CLIENT_TOKEN_FILE", "/run/secrets/unidesk_ssh_client_token"),
sshClientRouteAllowlist: parseRouteAllowlist(optionalEnv("UNIDESK_SSH_CLIENT_ROUTE_ALLOWLIST") ?? "G14,G14:*,D601,D601:*"),
wechatArchiveToken: optionalEnv("UNIDESK_WECHAT_ARCHIVE_TOKEN")
?? optionalFileEnv("UNIDESK_WECHAT_ARCHIVE_TOKEN_FILE", "/tmp/unidesk-wechat-archive-token"),
sessionSecret: requiredEnv("SESSION_SECRET"),
sessionTtlSeconds: readNumberEnv("SESSION_TTL_SECONDS"),
logFile: requiredEnv("LOG_FILE"),
@@ -319,6 +322,7 @@ function trimPerformanceBuffers(): void {
function classifyRequestComponent(pathname: string): string {
if (pathname === "/api/frontend-performance") return "webui_performance";
if (pathname === "/webhooks/wechat-archive") return "wechat_archive_webhook";
if (pathname.startsWith("/api/") || pathname === "/logs") return "webui_api_proxy";
if (pathname === "/login" || pathname === "/logout" || pathname === "/api/session") return "webui_auth";
if (pathname === "/app.js" || pathname.startsWith("/vendor/") || /\/[^/]+\.[a-z0-9]+$/iu.test(pathname)) return "webui_static";
@@ -789,6 +793,143 @@ function sessionResponse(req: Request): Response {
return jsonResponse({ ok: true, authenticated: true, user: { username: session.username }, expiresAt: new Date(session.expiresAt).toISOString() });
}
async function readJsonRecord(req: Request, maxBytes: number): Promise<Record<string, unknown>> {
const bytes = await req.arrayBuffer();
if (bytes.byteLength > maxBytes) throw new Error(`request body exceeds ${maxBytes} bytes`);
const text = Buffer.from(bytes).toString("utf8");
const parsed = JSON.parse(text) as unknown;
if (typeof parsed !== "object" || parsed === null || Array.isArray(parsed)) throw new Error("request body must be a JSON object");
return parsed as Record<string, unknown>;
}
function recordFrom(value: unknown): Record<string, unknown> {
return typeof value === "object" && value !== null && !Array.isArray(value) ? value as Record<string, unknown> : {};
}
function stringFrom(value: unknown): string {
return typeof value === "string" ? value : value === undefined || value === null ? "" : String(value);
}
function wechatArchiveTokenFromRequest(req: Request): string | null {
return bearerTokenFromRequest(req) ?? req.headers.get("x-unidesk-wechat-archive-token")?.trim() ?? null;
}
function isValidArchiveRemotePath(path: string): boolean {
return path.startsWith("/UniDesk/WeChatArchive/")
&& !path.includes("\0")
&& !path.split("/").includes("..")
&& path.length <= 1024;
}
function archiveContentPayload(body: Record<string, unknown>): Record<string, unknown> {
const message = recordFrom(body.message);
const archive = recordFrom(body.archive);
const media = recordFrom(message.media);
const messageType = stringFrom(message.messageType || body.messageType || "text").toLowerCase();
const remotePath = stringFrom(archive.remotePath);
if (!isValidArchiveRemotePath(remotePath)) throw new Error("archive.remotePath is outside the WeChat archive root");
const filename = stringFrom(archive.filename || remotePath.split("/").filter(Boolean).pop() || "wechat-archive.txt");
if (messageType === "image") {
const dataBase64 = stringFrom(media.dataBase64 || archive.dataBase64 || body.dataBase64);
if (!dataBase64) throw new Error("image archive payload is missing media.dataBase64");
return { dataBase64, filename, remotePath, maxBytes: 10 * 1024 * 1024 };
}
const text = stringFrom(message.text || body.text || body.message || body.content);
const payloadHash = new Bun.CryptoHasher("sha256").update(JSON.stringify(body)).digest("hex");
const content = `${text}\n\n---\nsource=unidesk-wechat-archive\npayloadSha256=${payloadHash}\n`;
return { content, filename, remotePath, maxBytes: 10 * 1024 * 1024 };
}
async function coreJson(path: string, init?: RequestInit): Promise<Record<string, unknown>> {
const response = await fetch(new URL(path, config.coreInternalUrl), {
...init,
headers: {
"content-type": "application/json",
...(init?.headers instanceof Headers ? Object.fromEntries(init.headers.entries()) : init?.headers as Record<string, string> | undefined),
},
});
const text = await response.text();
let parsed: unknown = text;
try {
parsed = text.length > 0 ? JSON.parse(text) : null;
} catch {
parsed = text;
}
if (!response.ok) return { ok: false, status: response.status, body: parsed };
return typeof parsed === "object" && parsed !== null && !Array.isArray(parsed)
? { ok: true, status: response.status, ...(parsed as Record<string, unknown>) }
: { ok: true, status: response.status, body: parsed };
}
async function waitBaiduTransfer(jobId: string, timeoutMs: number): Promise<Record<string, unknown>> {
const deadline = Date.now() + timeoutMs;
let last: Record<string, unknown> = { ok: false, error: "not-polled" };
while (Date.now() < deadline) {
last = await coreJson(`/api/microservices/baidu-netdisk/proxy/api/transfers/${encodeURIComponent(jobId)}`);
const job = recordFrom(last.job);
const status = stringFrom(job.status || last.status);
if (status === "succeeded") return { ok: true, job };
if (status === "failed" || status === "canceled") return { ok: false, job };
await Bun.sleep(1000);
}
return { ok: false, error: "baidu-transfer-timeout", last };
}
function baiduFsIdFromJob(job: Record<string, unknown>): string {
const result = recordFrom(job.result);
const baidu = recordFrom(result.baidu);
return stringFrom(baidu.fs_id || baidu.fsId || job.fsId);
}
async function wechatArchiveWebhook(req: Request): Promise<Response> {
if (req.method !== "POST") return jsonResponse({ ok: false, error: "method not allowed" }, 405);
const expected = config.wechatArchiveToken;
const supplied = wechatArchiveTokenFromRequest(req);
if (expected === null || supplied === null || !timingSafeStringEqual(supplied, expected)) {
return jsonResponse({ ok: false, error: "authentication required", valuesPrinted: false }, 401);
}
const started = performance.now();
try {
const body = await readJsonRecord(req, 2 * 1024 * 1024);
const uploadBody = archiveContentPayload(body);
const created = await coreJson("/api/microservices/baidu-netdisk/proxy/api/transfers/upload-content", {
method: "POST",
body: JSON.stringify(uploadBody),
});
if (created.ok !== true) {
recordOperationPerformance("frontend", "wechat_archive_upload_create", performance.now() - started, false, stringFrom(created.status));
return jsonResponse({ ok: false, error: "baidu upload create failed", upstream: created, valuesPrinted: false }, 502);
}
const job = recordFrom(created.job);
const jobId = stringFrom(job.id);
if (!jobId) throw new Error("baidu upload create did not return job id");
const waited = await waitBaiduTransfer(jobId, 90_000);
const waitedJob = recordFrom(waited.job);
const fsId = baiduFsIdFromJob(waitedJob);
const remotePath = stringFrom(uploadBody.remotePath);
const ok = waited.ok === true && fsId.length > 0;
recordOperationPerformance("frontend", "wechat_archive_upload", performance.now() - started, ok, `${jobId}:${remotePath}`);
logger(ok ? "info" : "warn", "wechat_archive_upload_finished", { ok, jobId, remotePath, fsIdPresent: fsId.length > 0 });
return jsonResponse({
ok,
response: ok ? `已归档到百度网盘:${remotePath}` : "归档失败",
archive: {
remotePath,
fsId,
local: recordFrom(created.local),
uploadJob: waitedJob,
listed: null,
},
valuesPrinted: false,
}, ok ? 200 : 502);
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
recordOperationPerformance("frontend", "wechat_archive_upload", performance.now() - started, false, message);
logger("warn", "wechat_archive_upload_failed", { error: message });
return jsonResponse({ ok: false, error: message, valuesPrinted: false }, 400);
}
}
async function proxyApi(req: Request, url: URL): Promise<Response> {
if (sessionFromRequest(req) === null) {
return jsonResponse({ ok: false, error: "authentication required" }, 401);
@@ -952,6 +1093,7 @@ async function handleRequest(req: Request, server: Server<FrontendWsData>): Prom
if (url.pathname === "/logout" && req.method === "POST") return logout();
if (url.pathname === "/api/session") return sessionResponse(req);
if (url.pathname === "/api/frontend-performance") return frontendPerformanceResponse(req);
if (url.pathname === "/webhooks/wechat-archive") return wechatArchiveWebhook(req);
if (url.pathname === "/ws/ssh") return proxySshWebSocket(req, server);
if (url.pathname.startsWith("/api/") || url.pathname === "/logs") return proxyApi(req, url);
if (url.pathname === "/docs" || url.pathname.startsWith("/docs/")) return docsResponse(req, url);
@@ -1032,6 +1032,38 @@ async function createTransferJob(direction: TransferDirection, body: JsonRecord)
return { ok: true, job: transferFromRow(row) };
}
function safeStagingFilename(value: string, fallback: string): string {
const base = basename(value || fallback || "upload.txt").replace(/[^A-Za-z0-9._-]+/gu, "_").replace(/^_+|_+$/gu, "");
return base || fallback || "upload.txt";
}
function contentUploadBuffer(body: JsonRecord): { buffer: Buffer; source: string } {
if (typeof body.dataBase64 === "string" && body.dataBase64.length > 0) return { buffer: Buffer.from(body.dataBase64, "base64"), source: "dataBase64" };
if (typeof body.content === "string") return { buffer: Buffer.from(body.content, "utf8"), source: "content" };
throw new HttpError(400, "upload-content requires content or dataBase64");
}
async function createContentUpload(body: JsonRecord): Promise<JsonRecord> {
const { buffer, source } = contentUploadBuffer(body);
const maxBytes = Math.max(1, Math.min(10 * 1024 * 1024, asNumber(body.maxBytes, 10 * 1024 * 1024)));
if (buffer.byteLength > maxBytes) throw new HttpError(413, "upload-content body is too large", { maxBytes, bytes: buffer.byteLength });
const remotePath = remoteFilePath(String(body.remotePath || ""), safeStagingFilename(String(body.filename || ""), "upload.txt"));
const filename = safeStagingFilename(String(body.filename || pathPosix.basename(remotePath)), pathPosix.basename(remotePath) || "upload.txt");
const relativePath = pathPosix.join("content-uploads", `${Date.now()}-${randomUUID().slice(0, 8)}-${filename}`);
const localPath = resolveStagingPath(relativePath);
mkdirSync(dirname(localPath), { recursive: true });
await writeFile(localPath, buffer);
const sha256 = createHash("sha256").update(buffer).digest("hex");
const created = await createTransferJob("upload", { localPath: relativePath, remotePath });
return {
ok: created.ok === true,
source,
remotePath,
local: { relativePath, bytes: buffer.byteLength, sha256 },
job: asRecord(created.job),
};
}
async function computeMd5Blocks(filePath: string, jobId: string): Promise<{ size: number; fullMd5: string; blocks: string[] }> {
const info = await stat(filePath);
if (!info.isFile()) throw new HttpError(400, "localPath must be a file inside staging directory", { localPath: filePath });
@@ -1363,6 +1395,7 @@ async function route(req: Request): Promise<Response> {
if (path === "/api/folders" && req.method === "POST") return jsonResponse(await createFolder(await readJsonBody(req)));
if (path === "/api/files/manage" && req.method === "POST") return jsonResponse(await manageFiles(await readJsonBody(req)));
if (path === "/api/transfers/upload-from-path" && req.method === "POST") return jsonResponse(await createTransferJob("upload", await readJsonBody(req)));
if (path === "/api/transfers/upload-content" && req.method === "POST") return jsonResponse(await createContentUpload(await readJsonBody(req)));
if (path === "/api/transfers/download-to-path" && req.method === "POST") return jsonResponse(await createTransferJob("download", await readJsonBody(req)));
if (path === "/api/self-test" && req.method === "POST") return jsonResponse(await runSelfTest(await readJsonBody(req)));
if (path === "/api/transfers" && req.method === "GET") return jsonResponse(await listTransfers(url));