Merge pull request #1002 from pikasTech/fix/998-test-accounts-db-mode

fix: 让 HWLAB test-accounts 按 DB mode 选择执行面
This commit is contained in:
Lyon
2026-06-26 15:54:47 +08:00
committed by GitHub
+293 -11
View File
@@ -4,6 +4,8 @@ import { dirname, isAbsolute, join } from "node:path";
import postgres from "postgres";
import { repoRoot, rootPath } from "./config";
import { runCommand, type CommandResult } from "./command";
import type { RuntimeSecretSpec } from "./hwlab-node/entry";
import { runtimeSecretSpec } from "./hwlab-node/public-exposure";
const defaultConfigPath = "config/hwlab-test-accounts.yaml";
const apiKeyTargetKey = "api-key";
@@ -93,6 +95,13 @@ interface SourceMaterial {
blocker: Record<string, unknown> | null;
}
interface DatabaseAccess {
kind: "platform-service" | "local-k3s";
runtimeStoreMode: string | null;
databaseUrlSource: SourceMaterial;
runtimeSecretSpec: RuntimeSecretSpec | null;
}
export async function runHwlabTestAccountsCommand(args: string[]): Promise<Record<string, unknown>> {
if (args.length === 0 || args.includes("--help") || args.includes("-h") || args[0] === "help") return hwlabTestAccountsHelp();
const options = parseOptions(args);
@@ -178,19 +187,22 @@ async function hostEnvSync(config: LoadedConfig, target: Target, options: Option
}
async function status(config: LoadedConfig, target: Target, options: Options): Promise<Record<string, unknown>> {
const database = readSource(target.userBilling.databaseUrlSource, config.sourceRoot, false);
const database = databaseAccessForTarget(config, target);
const accounts = await accountStatuses(config, target, options, database);
return report("status", config, target, options, accounts, database, { mutation: false });
}
async function sync(config: LoadedConfig, target: Target, options: Options): Promise<Record<string, unknown>> {
const database = readSource(target.userBilling.databaseUrlSource, config.sourceRoot, false);
const database = databaseAccessForTarget(config, target);
if (!options.confirm) {
return report("sync", config, target, options, [], database, { mutation: false, blockers: [{ code: "confirm_required", message: "sync requires --confirm" }] });
}
const sources = target.accounts.map((account) => readAccountSource(account, config.sourceRoot, options.createSources));
const sourceBlockers = [database.ok ? null : database.blocker, ...sources.map((source) => source.ok ? null : source.blocker)].filter(Boolean) as Record<string, unknown>[];
if (sourceBlockers.length > 0 || !database.value) {
const sourceBlockers = [
database.kind === "platform-service" && !database.databaseUrlSource.ok ? database.databaseUrlSource.blocker : null,
...sources.map((source) => source.ok ? null : source.blocker),
].filter(Boolean) as Record<string, unknown>[];
if (sourceBlockers.length > 0 || (database.kind === "platform-service" && !database.databaseUrlSource.value)) {
const accounts = await accountStatuses(config, target, options, database, sources);
return report("sync", config, target, options, accounts, database, { mutation: false, blockers: sourceBlockers });
}
@@ -207,7 +219,32 @@ async function sync(config: LoadedConfig, target: Target, options: Options): Pro
return report("sync", config, target, options, accounts, database, { mutation: synced.some((item) => item.mutation === true), synced });
}
const sql = postgres(database.value, { max: 1 });
if (database.kind === "local-k3s") {
const syncBlockers: Record<string, unknown>[] = [];
for (const account of userBillingAccounts) {
const source = sources.find((item) => item.sourceRef === account.sourceRef && item.sourceKey === account.sourceKey);
if (!source?.value || !source.sha256Hex || !source.serviceKeyPrefix) throw new Error(`${account.logicalId} source material missing after preflight`);
const targetStatus = userBillingApiKeyStatusLocalK3s(database.runtimeSecretSpec, account, source, options);
const statusBlockers = targetStatusAvailabilityBlockers(account, targetStatus);
if (statusBlockers.length > 0) {
syncBlockers.push(...statusBlockers);
continue;
}
if (userBillingAccountMatches(account, targetStatus)) {
synced.push(skippedUserBillingAccountSync(account, source, targetStatus));
continue;
}
synced.push(syncUserBillingAccountLocalK3s(database.runtimeSecretSpec, account, source, options));
}
if (syncBlockers.length > 0) {
const accounts = await accountStatuses(config, target, options, database, sources);
return report("sync", config, target, options, accounts, database, { mutation: synced.some((item) => item.mutation === true), synced, blockers: syncBlockers });
}
const accounts = await accountStatuses(config, target, options, database);
return report("sync", config, target, options, accounts, database, { mutation: synced.some((item) => item.mutation === true), synced });
}
const sql = postgres(database.databaseUrlSource.value ?? "", { max: 1 });
try {
await sql.begin(async (tx: any) => {
for (const account of userBillingAccounts) {
@@ -229,17 +266,20 @@ async function sync(config: LoadedConfig, target: Target, options: Options): Pro
return report("sync", config, target, options, accounts, database, { mutation: synced.some((item) => item.mutation === true), synced });
}
async function accountStatuses(config: LoadedConfig, target: Target, options: Options, database: SourceMaterial, preloadedSources?: SourceMaterial[]): Promise<Record<string, unknown>[]> {
const sql = database.value ? postgres(database.value, { max: 1 }) : null;
async function accountStatuses(config: LoadedConfig, target: Target, options: Options, database: DatabaseAccess, preloadedSources?: SourceMaterial[]): Promise<Record<string, unknown>[]> {
const sql = database.kind === "platform-service" && database.databaseUrlSource.value ? postgres(database.databaseUrlSource.value, { max: 1 }) : null;
try {
const statuses: Record<string, unknown>[] = [];
for (const account of target.accounts) {
const source = preloadedSources?.find((item) => item.sourceRef === account.sourceRef && item.sourceKey === account.sourceKey) ?? readAccountSource(account, config.sourceRoot, false);
const targetStatus = account.target.kind === "kubernetes-secret"
? runtimeSecretStatus(target, account, source, options)
: await userBillingApiKeyStatus(sql, account, source);
: database.kind === "local-k3s"
? userBillingApiKeyStatusLocalK3s(database.runtimeSecretSpec, account, source, options)
: await userBillingApiKeyStatus(sql, account, source);
const blockers = [
...validateAccount(account, source),
...targetStatusAvailabilityBlockers(account, targetStatus),
...targetFingerprintBlockers(account, targetStatus),
];
statuses.push({
@@ -268,6 +308,13 @@ function targetFingerprintBlockers(account: Account, targetStatus: Record<string
return [];
}
function targetStatusAvailabilityBlockers(account: Account, targetStatus: Record<string, unknown>): Record<string, unknown>[] {
if (targetStatus.checked === false || typeof targetStatus.error === "string") {
return [{ code: "target_status_unavailable", logicalId: account.logicalId, targetKind: account.target.kind, reason: targetStatus.error ?? targetStatus.reason ?? "unknown" }];
}
return [];
}
function readAccountSource(account: Account, sourceRoot: string, allowCreate: boolean): SourceMaterial {
if (allowCreate && account.createIfMissing.enabled && account.createIfMissing.randomBase64Url !== null) {
const sourcePath = resolveSourcePath(account.sourceRef, sourceRoot);
@@ -390,6 +437,197 @@ function skippedUserBillingAccountSync(account: Account, source: SourceMaterial,
};
}
function databaseAccessForTarget(config: LoadedConfig, target: Target): DatabaseAccess {
const spec = runtimeSecretSpec({ node: target.node, lane: target.lane });
const runtimeStoreMode = spec.runtimeLaneSpec?.runtimeStore?.postgres?.mode ?? null;
if (runtimeStoreMode === "local-k3s") {
return {
kind: "local-k3s",
runtimeStoreMode,
databaseUrlSource: inactiveDatabaseSource(target.userBilling.databaseUrlSource, config.sourceRoot),
runtimeSecretSpec: spec,
};
}
return {
kind: "platform-service",
runtimeStoreMode,
databaseUrlSource: readSource(target.userBilling.databaseUrlSource, config.sourceRoot, false),
runtimeSecretSpec: spec,
};
}
function inactiveDatabaseSource(source: SourceRef, sourceRoot: string): SourceMaterial {
const sourcePath = resolveSourcePath(source.sourceRef, sourceRoot);
return {
ok: true,
sourceRef: source.sourceRef,
sourceKey: source.sourceKey,
sourcePath: displaySourcePath(sourcePath),
exists: existsSync(sourcePath),
byteCount: null,
keyPrefix: null,
serviceKeyPrefix: null,
fingerprint: null,
sha256Hex: null,
value: null,
blocker: null,
};
}
function userBillingApiKeyStatusLocalK3s(spec: RuntimeSecretSpec | null, account: Account, source: SourceMaterial, options: Options): Record<string, unknown> {
if (spec === null) return { checked: false, kind: account.target.kind, reason: "runtime_secret_spec_missing", valuesRedacted: true };
const result = runLocalK3sUserBillingSql(spec, userBillingStatusSql(account, source), options);
const parsed = parseJsonObjectLine(result.stdout);
if (result.exitCode !== 0 || parsed === null) {
return {
checked: false,
kind: account.target.kind,
error: compactText(result.stderr || result.stdout || "local-k3s-user-billing-status-failed"),
result: compactCommandResult(result),
valuesRedacted: true,
};
}
if (parsed.ok === false) {
return { checked: false, kind: account.target.kind, error: String(parsed.error ?? "local-k3s-user-billing-status-failed"), result: compactCommandResult(result), valuesRedacted: true };
}
return {
checked: true,
kind: account.target.kind,
exists: parsed.exists === true,
userId: account.userId,
keyId: account.target.keyId,
user: parsed.user ?? null,
apiKey: normalizeApiKeyStatus(parsed.apiKey, account),
result: compactCommandResult(result),
valuesRedacted: true,
};
}
function syncUserBillingAccountLocalK3s(spec: RuntimeSecretSpec | null, account: Account, source: SourceMaterial, options: Options): Record<string, unknown> {
if (spec === null) throw new Error(`${account.logicalId} local-k3s runtime secret spec missing`);
if (!source.sha256Hex || !source.serviceKeyPrefix) throw new Error(`${account.logicalId} source material missing hash after preflight`);
const result = runLocalK3sUserBillingSql(spec, syncUserBillingAccountSql(account, source), options);
const parsed = parseJsonObjectLine(result.stdout);
if (result.exitCode !== 0 || parsed === null || parsed.ok === false) {
throw new Error(`${account.logicalId} local-k3s user-billing sync failed: ${compactText(result.stderr || result.stdout || String(parsed?.error ?? "unknown"))}`);
}
return {
logicalId: account.logicalId,
targetKind: account.target.kind,
mutation: parsed.mutation === true,
userId: account.userId,
keyId: account.target.keyId,
keyPrefix: source.serviceKeyPrefix,
fingerprint: source.fingerprint,
result: compactCommandResult(result),
valuesRedacted: true,
};
}
function runLocalK3sUserBillingSql(spec: RuntimeSecretSpec, sql: string, options: Options): CommandResult {
const script = [
"set -eu",
`namespace=${shellQuote(spec.namespace)}`,
`postgres_secret=${shellQuote(spec.postgresSecret)}`,
`postgres_statefulset=${shellQuote(spec.postgresStatefulSet)}`,
`postgres_admin_user=${shellQuote(spec.postgresAdminUser)}`,
`db_name=${shellQuote(spec.cloudApiDbName)}`,
"secret_b64_key() { kubectl -n \"$namespace\" get secret \"$1\" -o \"go-template={{ index .data \\\"$2\\\" }}\" 2>/dev/null || true; }",
"postgres_admin_b64=$(secret_b64_key \"$postgres_secret\" POSTGRES_PASSWORD)",
"if [ -z \"$postgres_admin_b64\" ]; then printf '{\"ok\":false,\"error\":\"postgres-admin-secret-missing\"}\\n'; exit 0; fi",
"postgres_admin_password=$(printf '%s' \"$postgres_admin_b64\" | base64 -d 2>/dev/null || true)",
"if [ -z \"$postgres_admin_password\" ]; then printf '{\"ok\":false,\"error\":\"postgres-admin-secret-decode-failed\"}\\n'; exit 0; fi",
"kubectl -n \"$namespace\" exec -i \"statefulset/$postgres_statefulset\" -c postgres -- env PGPASSWORD=\"$postgres_admin_password\" psql -X -q -t -A -v ON_ERROR_STOP=1 -U \"$postgres_admin_user\" -d \"$db_name\" <<'SQL'",
sql,
"SQL",
].join("\n");
return runCommand([transPath(), `${spec.node}:k3s`, "sh"], repoRoot, { input: script, timeoutMs: options.timeoutSeconds * 1000 });
}
function userBillingStatusSql(account: Account, source: SourceMaterial): string {
const expectedHash = source.ok && source.sha256Hex ? source.sha256Hex : "";
return `
SELECT jsonb_build_object(
'ok', true,
'checked', true,
'kind', ${sqlLiteral(account.target.kind)},
'exists', u.id IS NOT NULL,
'user', CASE WHEN u.id IS NULL THEN NULL ELSE jsonb_build_object(
'id', u.id,
'email', u.email,
'username', u.username,
'displayName', u.display_name,
'role', u.role,
'status', u.status,
'planId', COALESCE(a.plan_id, ''),
'balanceCredits', COALESCE(a.balance_credits, 0),
'reservedCredits', COALESCE(a.reserved_credits, 0)
) END,
'apiKey', CASE WHEN k.id IS NULL THEN jsonb_build_object(
'exists', false,
'keyId', ${sqlLiteral(account.target.keyId ?? "")},
'keyPrefix', NULL,
'status', NULL,
'scopes', '[]'::jsonb,
'matchesSourceFingerprint', NULL
) ELSE jsonb_build_object(
'exists', true,
'keyId', ${sqlLiteral(account.target.keyId ?? "")},
'keyPrefix', k.key_prefix,
'status', k.status,
'scopes', COALESCE(k.scopes_json, '[]'::jsonb),
'matchesSourceFingerprint', CASE WHEN ${sqlLiteral(expectedHash)} = '' OR k.key_hash IS NULL THEN NULL ELSE k.key_hash = ${sqlLiteral(expectedHash)} END
) END
)::text
FROM (SELECT 1) seed
LEFT JOIN hwlab_users u ON u.id = ${sqlLiteral(account.userId)}
LEFT JOIN hwlab_credit_accounts a ON a.user_id = u.id
LEFT JOIN hwlab_api_keys k ON k.id = ${sqlLiteral(account.target.keyId ?? "")} AND k.user_id = u.id
LIMIT 1;`.trim();
}
function syncUserBillingAccountSql(account: Account, source: SourceMaterial): string {
const metadata = JSON.stringify({ source: "unidesk-hwlab-test-accounts", logicalId: account.logicalId, sourceRef: account.sourceRef });
return `
BEGIN;
DO $$
BEGIN
IF EXISTS (SELECT 1 FROM hwlab_users WHERE id <> ${sqlLiteral(account.userId)} AND (lower(username) = lower(${sqlLiteral(account.username)}) OR lower(email) = lower(${sqlLiteral(account.email ?? "")})) LIMIT 1) THEN
RAISE EXCEPTION 'test account username/email is already owned by another user';
END IF;
IF EXISTS (SELECT 1 FROM hwlab_api_keys WHERE key_hash = ${sqlLiteral(source.sha256Hex ?? "")} AND id <> ${sqlLiteral(account.target.keyId ?? "")} LIMIT 1) THEN
RAISE EXCEPTION 'test account API key hash is already owned by another key';
END IF;
END $$;
INSERT INTO hwlab_users (id, email, username, display_name, password_hash, status, role, email_verified)
VALUES (${sqlLiteral(account.userId)}, ${sqlLiteral(account.email ?? "")}, ${sqlLiteral(account.username)}, ${sqlLiteral(account.displayName)}, 'owner-only-api-key-account', ${sqlLiteral(account.status)}, ${sqlLiteral(account.role)}, true)
ON CONFLICT (id) DO UPDATE SET email = EXCLUDED.email, username = EXCLUDED.username, display_name = EXCLUDED.display_name, status = EXCLUDED.status, role = EXCLUDED.role, email_verified = true, updated_at = now();
INSERT INTO hwlab_credit_accounts (user_id, balance_credits, reserved_credits, plan_id)
VALUES (${sqlLiteral(account.userId)}, ${account.initialCredits}, 0, ${sqlLiteral(account.planId)})
ON CONFLICT (user_id) DO UPDATE SET balance_credits = GREATEST(hwlab_credit_accounts.balance_credits, EXCLUDED.balance_credits), plan_id = EXCLUDED.plan_id, updated_at = now();
${account.initialCredits > 0 ? `INSERT INTO hwlab_credit_ledger (id, user_id, delta_credits, balance_before, balance_after, kind, reason, source, status, idempotency_key, operator_user_id, operator_username, metadata)
VALUES (${sqlLiteral(`led_${account.logicalId.replaceAll("-", "_")}_initial`)}, ${sqlLiteral(account.userId)}, ${account.initialCredits}, 0, ${account.initialCredits}, 'admin_grant', 'test_account_initial_credit', 'test-account-sync', 'applied', ${sqlLiteral(`test-account-sync:${account.logicalId}:initial-credit`)}, 'usr_v03_admin', 'admin', ${sqlLiteral(metadata)}::jsonb)
ON CONFLICT DO NOTHING;` : ""}
INSERT INTO hwlab_api_keys (id, user_id, name, key_prefix, key_hash, scopes_json, status, revoked_at)
VALUES (${sqlLiteral(account.target.keyId ?? "")}, ${sqlLiteral(account.userId)}, ${sqlLiteral(account.target.keyName)}, ${sqlLiteral(source.serviceKeyPrefix ?? "")}, ${sqlLiteral(source.sha256Hex ?? "")}, ${sqlLiteral(JSON.stringify(account.target.scopes))}::jsonb, 'active', NULL)
ON CONFLICT (id) DO UPDATE SET user_id = EXCLUDED.user_id, name = EXCLUDED.name, key_prefix = EXCLUDED.key_prefix, key_hash = EXCLUDED.key_hash, scopes_json = EXCLUDED.scopes_json, status = 'active', revoked_at = NULL;
COMMIT;
SELECT jsonb_build_object('ok', true, 'mutation', true, 'userId', ${sqlLiteral(account.userId)}, 'keyId', ${sqlLiteral(account.target.keyId ?? "")}, 'keyPrefix', ${sqlLiteral(source.serviceKeyPrefix ?? "")}, 'valuesRedacted', true)::text;`.trim();
}
function normalizeApiKeyStatus(value: unknown, account: Account): Record<string, unknown> {
const apiKey = plainObject(value);
if (apiKey === null) return { exists: false, keyId: account.target.keyId, keyPrefix: null, status: null, scopes: [], matchesSourceFingerprint: null };
return {
exists: apiKey.exists === true,
keyId: apiKey.keyId ?? account.target.keyId,
keyPrefix: apiKey.keyPrefix ?? null,
status: apiKey.status ?? null,
scopes: parseJsonArray(apiKey.scopes),
matchesSourceFingerprint: apiKey.matchesSourceFingerprint === true ? true : apiKey.matchesSourceFingerprint === false ? false : null,
};
}
function syncKubernetesSecret(target: Target, account: Account, source: SourceMaterial, options: Options): Record<string, unknown> {
const namespace = account.target.namespace ?? target.namespace;
const secretName = account.target.secretName ?? "";
@@ -620,9 +858,9 @@ function compactTargetStatus(status: Record<string, unknown>): Record<string, un
return target;
}
function report(action: string, config: LoadedConfig, target: Target, options: Options, accounts: Record<string, unknown>[], database: SourceMaterial, extra: Record<string, unknown> = {}): Record<string, unknown> {
function report(action: string, config: LoadedConfig, target: Target, options: Options, accounts: Record<string, unknown>[], database: DatabaseAccess, extra: Record<string, unknown> = {}): Record<string, unknown> {
const blockers = [
...(database.ok ? [] : [database.blocker]),
...(database.kind === "platform-service" && !database.databaseUrlSource.ok ? [database.databaseUrlSource.blocker] : []),
...accounts.flatMap((account) => Array.isArray(account.blockers) ? account.blockers : []),
...((extra.blockers as Record<string, unknown>[] | undefined) ?? []),
].filter(Boolean);
@@ -633,7 +871,8 @@ function report(action: string, config: LoadedConfig, target: Target, options: O
version: config.version,
target: { id: target.id, node: target.node, lane: target.lane, namespace: target.namespace, publicUrl: target.publicUrl, userBillingServiceId: target.userBilling.serviceId },
sourceRoot: config.sourceRoot,
databaseUrlSource: publicSource(database),
databaseAccess: publicDatabaseAccess(database),
databaseUrlSource: publicSource(database.databaseUrlSource),
accounts,
blockers,
next: blockers.length === 0 ? undefined : { sync: `bun scripts/cli.ts hwlab nodes test-accounts sync --node ${options.node} --lane ${options.lane} --confirm` },
@@ -726,6 +965,27 @@ function publicSource(source: SourceMaterial): Record<string, unknown> {
return { sourceRef: source.sourceRef, sourceKey: source.sourceKey, sourcePath: source.sourcePath, exists: source.exists, byteCount: source.byteCount, keyPrefix: source.keyPrefix, fingerprint: source.fingerprint, valuesRedacted: true };
}
function publicDatabaseAccess(database: DatabaseAccess): Record<string, unknown> {
const spec = database.runtimeSecretSpec;
return {
mode: database.kind,
runtimeStoreMode: database.runtimeStoreMode,
databaseUrlSource: { ...publicSource(database.databaseUrlSource), active: database.kind === "platform-service" },
localK3s: database.kind === "local-k3s" && spec !== null
? {
namespace: spec.namespace,
postgresStatefulSet: spec.postgresStatefulSet,
postgresSecret: spec.postgresSecret,
database: spec.cloudApiDbName,
user: spec.cloudApiDbUser,
cloudApiDbSecret: spec.cloudApiDbSecret,
cloudApiDbKey: spec.cloudApiDbKey,
}
: null,
valuesRedacted: true,
};
}
function selectTarget(config: LoadedConfig, options: Options): Target {
const target = config.targets.find((item) => item.node === options.node && item.lane === options.lane);
if (!target) throw new Error(`${config.configPath} has no target for node=${options.node} lane=${options.lane}`);
@@ -809,6 +1069,28 @@ function shellQuote(value: string): string {
return `'${value.replace(/'/gu, "'\\''")}'`;
}
function sqlLiteral(value: string): string {
return `'${value.replace(/'/gu, "''")}'`;
}
function parseJsonObjectLine(text: string): Record<string, unknown> | null {
for (const line of text.split(/\r?\n/u)) {
const trimmed = line.trim();
if (!trimmed.startsWith("{") || !trimmed.endsWith("}")) continue;
try {
const parsed = JSON.parse(trimmed) as unknown;
return plainObject(parsed);
} catch {
continue;
}
}
return null;
}
function compactText(value: string): string {
return value.replace(/\b([A-Za-z0-9_.-]*(?:TOKEN|PASSWORD|SECRET|API_KEY|DATABASE_URL)[A-Za-z0-9_.-]*)=([^\s"'`]+)/giu, "$1=<redacted>").trim().slice(0, 1000);
}
function requiredValue(args: string[], index: number, name: string): string {
const value = args[index];
if (value === undefined || value.length === 0 || value.startsWith("--")) throw new Error(`${name} requires a value`);