fix: recover decision center database connections

This commit is contained in:
Codex
2026-05-17 15:18:47 +00:00
parent d9258ac134
commit f4787a8cf0
@@ -71,7 +71,7 @@ const serviceStartedAt = new Date().toISOString();
const recentLogs: JsonRecord[] = [];
let schemaReady = false;
let schemaLastError: JsonRecord | null = null;
let databaseHealth: JsonRecord = { ok: false, recordCount: 0, checkedAt: "", error: null };
let databaseHealth: JsonRecord = { ok: false, recordCount: 0, checkedAt: "", lastOkAt: "", degraded: false, error: null };
let databaseHealthRefreshInFlight = false;
function envString(name: string, fallback: string): string {
@@ -99,12 +99,18 @@ function configFromEnv(): RuntimeConfig {
}
const config = configFromEnv();
const sql = postgres(config.databaseUrl, {
max: config.databasePoolMax,
idle_timeout: 20,
connect_timeout: 10,
connection: { application_name: "unidesk-decision-center" },
});
type SqlClient = ReturnType<typeof postgres>;
function createSqlClient(): SqlClient {
return postgres(config.databaseUrl, {
max: config.databasePoolMax,
idle_timeout: 20,
connect_timeout: 5,
connection: { application_name: "unidesk-decision-center" },
});
}
let sql = createSqlClient();
const logWriter = config.logFile
? createHourlyJsonlWriter({
baseLogFile: config.logFile,
@@ -150,6 +156,35 @@ function errorResponse(error: unknown): Response {
return jsonResponse(body, status);
}
function shouldResetSqlClient(error: unknown): boolean {
if (error instanceof HttpError) return false;
const message = error instanceof Error ? `${error.name} ${error.message}` : String(error);
return /CONNECT_TIMEOUT|ECONN|ETIMEDOUT|EPIPE|connection|terminated|closed|offset is out of range/iu.test(message);
}
async function resetSqlClient(operation: string, error: unknown, failedClient: SqlClient): Promise<void> {
if (!shouldResetSqlClient(error) || failedClient !== sql) return;
const previous = sql;
sql = createSqlClient();
log("warn", "postgres_client_reset", { operation, error: errorToJson(error) });
try {
await previous.end({ timeout: 1 });
} catch (closeError) {
log("warn", "postgres_client_close_failed", { operation, error: errorToJson(closeError) });
}
}
async function withDatabaseRecovery<T>(operation: string, run: () => Promise<T>, options: { retryRead?: boolean } = {}): Promise<T> {
const failedClient = sql;
try {
return await run();
} catch (error) {
await resetSqlClient(operation, error, failedClient);
if (options.retryRead === true && shouldResetSqlClient(error)) return run();
throw error;
}
}
function iso(value: Date | string | null | undefined): string {
if (value === null || value === undefined) return "";
const date = value instanceof Date ? value : new Date(value);
@@ -250,30 +285,32 @@ async function readJsonBody(req: Request): Promise<Record<string, unknown>> {
}
async function ensureSchema(): Promise<void> {
await sql`
CREATE TABLE IF NOT EXISTS decision_center_records (
id TEXT PRIMARY KEY,
type TEXT NOT NULL,
level TEXT NOT NULL DEFAULT 'none',
status TEXT NOT NULL DEFAULT 'active',
title TEXT NOT NULL,
body TEXT NOT NULL DEFAULT '',
linked_goal_id TEXT,
tags JSONB NOT NULL DEFAULT '[]'::jsonb,
evidence_links JSONB NOT NULL DEFAULT '[]'::jsonb,
source_session TEXT NOT NULL DEFAULT '',
task_id TEXT NOT NULL DEFAULT '',
commit_id TEXT NOT NULL DEFAULT '',
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
CONSTRAINT decision_center_records_type_check CHECK (type IN ('meeting', 'decision', 'goal', 'blocker', 'debt', 'experiment')),
CONSTRAINT decision_center_records_level_check CHECK (level IN ('G0', 'G1', 'G2', 'G3', 'P0', 'P1', 'P2', 'P3', 'none')),
CONSTRAINT decision_center_records_status_check CHECK (status IN ('active', 'blocked', 'parked', 'done'))
)
`;
await sql`CREATE INDEX IF NOT EXISTS idx_decision_center_records_type_status_level ON decision_center_records(type, status, level)`;
await sql`CREATE INDEX IF NOT EXISTS idx_decision_center_records_linked_goal ON decision_center_records(linked_goal_id)`;
await sql`CREATE INDEX IF NOT EXISTS idx_decision_center_records_updated ON decision_center_records(updated_at DESC)`;
await withDatabaseRecovery("ensure_schema", async () => {
await sql`
CREATE TABLE IF NOT EXISTS decision_center_records (
id TEXT PRIMARY KEY,
type TEXT NOT NULL,
level TEXT NOT NULL DEFAULT 'none',
status TEXT NOT NULL DEFAULT 'active',
title TEXT NOT NULL,
body TEXT NOT NULL DEFAULT '',
linked_goal_id TEXT,
tags JSONB NOT NULL DEFAULT '[]'::jsonb,
evidence_links JSONB NOT NULL DEFAULT '[]'::jsonb,
source_session TEXT NOT NULL DEFAULT '',
task_id TEXT NOT NULL DEFAULT '',
commit_id TEXT NOT NULL DEFAULT '',
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
CONSTRAINT decision_center_records_type_check CHECK (type IN ('meeting', 'decision', 'goal', 'blocker', 'debt', 'experiment')),
CONSTRAINT decision_center_records_level_check CHECK (level IN ('G0', 'G1', 'G2', 'G3', 'P0', 'P1', 'P2', 'P3', 'none')),
CONSTRAINT decision_center_records_status_check CHECK (status IN ('active', 'blocked', 'parked', 'done'))
)
`;
await sql`CREATE INDEX IF NOT EXISTS idx_decision_center_records_type_status_level ON decision_center_records(type, status, level)`;
await sql`CREATE INDEX IF NOT EXISTS idx_decision_center_records_linked_goal ON decision_center_records(linked_goal_id)`;
await sql`CREATE INDEX IF NOT EXISTS idx_decision_center_records_updated ON decision_center_records(updated_at DESC)`;
}, { retryRead: true });
}
async function waitForSchema(): Promise<void> {
@@ -298,18 +335,27 @@ async function refreshDatabaseHealth(): Promise<void> {
if (databaseHealthRefreshInFlight) return;
databaseHealthRefreshInFlight = true;
try {
const rows = await sql<{ count: string | number }[]>`SELECT count(*) AS count FROM decision_center_records`;
const rows = await withDatabaseRecovery("refresh_database_health", () => sql<{ count: string | number }[]>`SELECT count(*) AS count FROM decision_center_records`, { retryRead: true });
const checkedAt = new Date().toISOString();
databaseHealth = {
ok: true,
recordCount: Number(rows[0]?.count ?? 0),
checkedAt: new Date().toISOString(),
checkedAt,
lastOkAt: checkedAt,
degraded: false,
error: null,
};
} catch (error) {
const checkedAt = new Date().toISOString();
const lastOkAt = typeof databaseHealth.lastOkAt === "string" ? databaseHealth.lastOkAt : "";
const lastOkMs = Date.parse(lastOkAt);
const withinGrace = Number.isFinite(lastOkMs) && Date.now() - lastOkMs < 5 * 60_000;
databaseHealth = {
ok: false,
recordCount: 0,
checkedAt: new Date().toISOString(),
ok: withinGrace,
recordCount: Number(databaseHealth.recordCount ?? 0),
checkedAt,
lastOkAt,
degraded: withinGrace,
error: errorToJson(error),
};
} finally {
@@ -348,7 +394,7 @@ async function createRecord(input: Record<string, unknown>): Promise<DecisionRec
if (title.length > 240) throw new HttpError(400, "title must be at most 240 characters");
if (body.length > 300_000) throw new HttpError(400, "body must be at most 300000 characters");
const id = asString(input.id) || `dc_${randomUUID()}`;
const rows = await sql<DecisionRecordRow[]>`
const rows = await withDatabaseRecovery("create_record", () => sql<DecisionRecordRow[]>`
INSERT INTO decision_center_records (
id, type, level, status, title, body, linked_goal_id, tags, evidence_links, source_session, task_id, commit_id
) VALUES (
@@ -366,14 +412,14 @@ async function createRecord(input: Record<string, unknown>): Promise<DecisionRec
${asString(input.commitId)}
)
RETURNING *
`;
`);
log("info", "record_created", { id: rows[0]?.id ?? id, type: rows[0]?.type ?? "", level: rows[0]?.level ?? "" });
return recordFromRow(rows[0]!);
}
async function updateRecord(id: string, input: Record<string, unknown>): Promise<DecisionRecord> {
const existing = await getRecord(id);
const rows = await sql<DecisionRecordRow[]>`
const rows = await withDatabaseRecovery("update_record", () => sql<DecisionRecordRow[]>`
UPDATE decision_center_records
SET
type = ${"type" in input ? parseRecordType(input.type, existing.type) : existing.type},
@@ -390,12 +436,12 @@ async function updateRecord(id: string, input: Record<string, unknown>): Promise
updated_at = now()
WHERE id = ${id}
RETURNING *
`;
`);
return recordFromRow(rows[0]!);
}
async function getRecord(id: string): Promise<DecisionRecord> {
const rows = await sql<DecisionRecordRow[]>`SELECT * FROM decision_center_records WHERE id = ${id}`;
const rows = await withDatabaseRecovery("get_record", () => sql<DecisionRecordRow[]>`SELECT * FROM decision_center_records WHERE id = ${id}`, { retryRead: true });
if (rows.length === 0) throw new HttpError(404, "decision record not found", { id });
return recordFromRow(rows[0]!);
}
@@ -409,7 +455,7 @@ async function listRecords(url: URL): Promise<DecisionRecord[]> {
if (type && !recordTypes.has(type as DecisionRecordType)) throw new HttpError(400, "unsupported type filter", { type });
if (status && !recordStatuses.has(status as DecisionRecordStatus)) throw new HttpError(400, "unsupported status filter", { status });
if (level && !recordLevels.has(level as DecisionRecordLevel)) throw new HttpError(400, "unsupported level filter", { level });
const rows = await sql<DecisionRecordRow[]>`
const rows = await withDatabaseRecovery("list_records", () => sql<DecisionRecordRow[]>`
SELECT *
FROM decision_center_records
WHERE (${type || null}::text IS NULL OR type = ${type || null})
@@ -430,7 +476,7 @@ async function listRecords(url: URL): Promise<DecisionRecord[]> {
END ASC,
updated_at DESC
LIMIT ${limit}
`;
`, { retryRead: true });
return rows.map(recordFromRow);
}
@@ -476,7 +522,7 @@ async function importMeeting(input: Record<string, unknown>): Promise<JsonRecord
}
async function deleteRecord(id: string): Promise<JsonRecord> {
const rows = await sql<DecisionRecordRow[]>`DELETE FROM decision_center_records WHERE id = ${id} RETURNING *`;
const rows = await withDatabaseRecovery("delete_record", () => sql<DecisionRecordRow[]>`DELETE FROM decision_center_records WHERE id = ${id} RETURNING *`);
if (rows.length === 0) throw new HttpError(404, "decision record not found", { id });
log("info", "record_deleted", { id });
return { ok: true, deleted: recordFromRow(rows[0]!) };