fix: route D601 PK01 postgres through master relay (#967)
Co-authored-by: Codex <codex@noreply.local>
This commit is contained in:
@@ -0,0 +1,147 @@
|
||||
const net = require("node:net");
|
||||
const fs = require("node:fs");
|
||||
|
||||
function readInt(name, fallback) {
|
||||
const raw = process.env[name];
|
||||
if (!raw) return fallback;
|
||||
const parsed = Number(raw);
|
||||
if (!Number.isInteger(parsed) || parsed <= 0 || parsed > 65535) {
|
||||
throw new Error(`${name} must be a TCP port number`);
|
||||
}
|
||||
return parsed;
|
||||
}
|
||||
|
||||
const listenHost = process.env.PK01_POSTGRES_RELAY_LISTEN_HOST || "0.0.0.0";
|
||||
const listenPort = readInt("PK01_POSTGRES_RELAY_LISTEN_PORT", 15433);
|
||||
const targetHost = process.env.PK01_POSTGRES_RELAY_TARGET_HOST || "82.156.23.220";
|
||||
const targetPort = readInt("PK01_POSTGRES_RELAY_TARGET_PORT", 5432);
|
||||
const idleTimeoutMs = Number(process.env.PK01_POSTGRES_RELAY_IDLE_TIMEOUT_MS || 300000);
|
||||
const allowedSourceCidrs = (process.env.PK01_POSTGRES_RELAY_ALLOWED_SOURCE_CIDRS || "127.0.0.1/32,10.0.0.0/8,172.16.0.0/12,192.168.0.0/16")
|
||||
.split(",")
|
||||
.map((item) => item.trim())
|
||||
.filter(Boolean);
|
||||
const logFile = process.env.LOG_FILE || "";
|
||||
|
||||
function log(level, message, data = {}) {
|
||||
const record = JSON.stringify({
|
||||
ts: new Date().toISOString(),
|
||||
service: "pk01-postgres-relay",
|
||||
level,
|
||||
message,
|
||||
data,
|
||||
});
|
||||
console.log(record);
|
||||
if (logFile) {
|
||||
fs.appendFile(logFile, `${record}\n`, () => {});
|
||||
}
|
||||
}
|
||||
|
||||
function exitWithError(error) {
|
||||
log("error", "fatal", { error: error instanceof Error ? error.message : String(error) });
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
function ipv4ToInt(value) {
|
||||
const parts = value.split(".");
|
||||
if (parts.length !== 4) return null;
|
||||
let result = 0;
|
||||
for (const part of parts) {
|
||||
if (!/^\d+$/u.test(part)) return null;
|
||||
const octet = Number(part);
|
||||
if (!Number.isInteger(octet) || octet < 0 || octet > 255) return null;
|
||||
result = ((result << 8) | octet) >>> 0;
|
||||
}
|
||||
return result >>> 0;
|
||||
}
|
||||
|
||||
function normalizeRemoteAddress(value) {
|
||||
if (!value) return "";
|
||||
if (value.startsWith("::ffff:")) return value.slice("::ffff:".length);
|
||||
return value;
|
||||
}
|
||||
|
||||
function sourceAllowed(remoteAddress) {
|
||||
const normalized = normalizeRemoteAddress(remoteAddress);
|
||||
if (normalized === "::1") return true;
|
||||
const ip = ipv4ToInt(normalized);
|
||||
if (ip === null) return false;
|
||||
return allowedSourceCidrs.some((cidr) => {
|
||||
const [baseRaw, prefixRaw = "32"] = cidr.split("/");
|
||||
const base = ipv4ToInt(baseRaw);
|
||||
const prefix = Number(prefixRaw);
|
||||
if (base === null || !Number.isInteger(prefix) || prefix < 0 || prefix > 32) return false;
|
||||
const mask = prefix === 0 ? 0 : (0xffffffff << (32 - prefix)) >>> 0;
|
||||
return (ip & mask) === (base & mask);
|
||||
});
|
||||
}
|
||||
|
||||
async function runHealthcheck() {
|
||||
await new Promise((resolve, reject) => {
|
||||
const socket = net.connect({ host: "127.0.0.1", port: listenPort }, resolve);
|
||||
socket.setTimeout(2000);
|
||||
socket.on("timeout", () => reject(new Error("healthcheck timeout")));
|
||||
socket.on("error", reject);
|
||||
socket.on("connect", () => socket.destroy());
|
||||
});
|
||||
}
|
||||
|
||||
if (process.argv.includes("--healthcheck")) {
|
||||
runHealthcheck().then(() => process.exit(0), exitWithError);
|
||||
} else {
|
||||
let nextConnectionId = 0;
|
||||
const server = net.createServer((client) => {
|
||||
const connectionId = ++nextConnectionId;
|
||||
const remoteAddress = normalizeRemoteAddress(client.remoteAddress || "");
|
||||
if (!sourceAllowed(remoteAddress)) {
|
||||
log("warn", "source_rejected", { connectionId, remoteAddress, allowedSourceCidrs });
|
||||
client.destroy();
|
||||
return;
|
||||
}
|
||||
const upstream = net.connect({ host: targetHost, port: targetPort });
|
||||
client.setNoDelay(true);
|
||||
upstream.setNoDelay(true);
|
||||
client.setTimeout(idleTimeoutMs);
|
||||
upstream.setTimeout(idleTimeoutMs);
|
||||
|
||||
let closed = false;
|
||||
const closeBoth = (reason) => {
|
||||
if (closed) return;
|
||||
closed = true;
|
||||
client.destroy();
|
||||
upstream.destroy();
|
||||
log("info", "closed", { connectionId, reason });
|
||||
};
|
||||
|
||||
upstream.on("connect", () => {
|
||||
log("info", "connected", { connectionId, target: `${targetHost}:${targetPort}` });
|
||||
});
|
||||
client.on("timeout", () => closeBoth("client_idle_timeout"));
|
||||
upstream.on("timeout", () => closeBoth("upstream_idle_timeout"));
|
||||
client.on("error", (error) => closeBoth(`client_error:${error.message}`));
|
||||
upstream.on("error", (error) => closeBoth(`upstream_error:${error.message}`));
|
||||
client.on("close", () => closeBoth("client_closed"));
|
||||
upstream.on("close", () => closeBoth("upstream_closed"));
|
||||
|
||||
client.pipe(upstream);
|
||||
upstream.pipe(client);
|
||||
});
|
||||
|
||||
server.on("error", exitWithError);
|
||||
server.listen(listenPort, listenHost, () => {
|
||||
log("info", "listening", {
|
||||
listen: `${listenHost}:${listenPort}`,
|
||||
target: `${targetHost}:${targetPort}`,
|
||||
networkMode: "host",
|
||||
allowedSourceCidrs,
|
||||
});
|
||||
});
|
||||
|
||||
process.on("SIGTERM", () => {
|
||||
log("info", "shutdown_requested", { signal: "SIGTERM" });
|
||||
server.close(() => process.exit(0));
|
||||
});
|
||||
process.on("SIGINT", () => {
|
||||
log("info", "shutdown_requested", { signal: "SIGINT" });
|
||||
server.close(() => process.exit(0));
|
||||
});
|
||||
}
|
||||
Reference in New Issue
Block a user