From f4787a8cf0ddb9a773c05d86ea3dca5e8d93bd35 Mon Sep 17 00:00:00 2001 From: Codex Date: Sun, 17 May 2026 15:18:47 +0000 Subject: [PATCH] fix: recover decision center database connections --- .../decision-center/src/index.ts | 134 ++++++++++++------ 1 file changed, 90 insertions(+), 44 deletions(-) diff --git a/src/components/microservices/decision-center/src/index.ts b/src/components/microservices/decision-center/src/index.ts index 3fda8d9f..22f3b4c5 100644 --- a/src/components/microservices/decision-center/src/index.ts +++ b/src/components/microservices/decision-center/src/index.ts @@ -71,7 +71,7 @@ const serviceStartedAt = new Date().toISOString(); const recentLogs: JsonRecord[] = []; let schemaReady = false; let schemaLastError: JsonRecord | null = null; -let databaseHealth: JsonRecord = { ok: false, recordCount: 0, checkedAt: "", error: null }; +let databaseHealth: JsonRecord = { ok: false, recordCount: 0, checkedAt: "", lastOkAt: "", degraded: false, error: null }; let databaseHealthRefreshInFlight = false; function envString(name: string, fallback: string): string { @@ -99,12 +99,18 @@ function configFromEnv(): RuntimeConfig { } const config = configFromEnv(); -const sql = postgres(config.databaseUrl, { - max: config.databasePoolMax, - idle_timeout: 20, - connect_timeout: 10, - connection: { application_name: "unidesk-decision-center" }, -}); +type SqlClient = ReturnType; + +function createSqlClient(): SqlClient { + return postgres(config.databaseUrl, { + max: config.databasePoolMax, + idle_timeout: 20, + connect_timeout: 5, + connection: { application_name: "unidesk-decision-center" }, + }); +} + +let sql = createSqlClient(); const logWriter = config.logFile ? createHourlyJsonlWriter({ baseLogFile: config.logFile, @@ -150,6 +156,35 @@ function errorResponse(error: unknown): Response { return jsonResponse(body, status); } +function shouldResetSqlClient(error: unknown): boolean { + if (error instanceof HttpError) return false; + const message = error instanceof Error ? `${error.name} ${error.message}` : String(error); + return /CONNECT_TIMEOUT|ECONN|ETIMEDOUT|EPIPE|connection|terminated|closed|offset is out of range/iu.test(message); +} + +async function resetSqlClient(operation: string, error: unknown, failedClient: SqlClient): Promise { + if (!shouldResetSqlClient(error) || failedClient !== sql) return; + const previous = sql; + sql = createSqlClient(); + log("warn", "postgres_client_reset", { operation, error: errorToJson(error) }); + try { + await previous.end({ timeout: 1 }); + } catch (closeError) { + log("warn", "postgres_client_close_failed", { operation, error: errorToJson(closeError) }); + } +} + +async function withDatabaseRecovery(operation: string, run: () => Promise, options: { retryRead?: boolean } = {}): Promise { + const failedClient = sql; + try { + return await run(); + } catch (error) { + await resetSqlClient(operation, error, failedClient); + if (options.retryRead === true && shouldResetSqlClient(error)) return run(); + throw error; + } +} + function iso(value: Date | string | null | undefined): string { if (value === null || value === undefined) return ""; const date = value instanceof Date ? value : new Date(value); @@ -250,30 +285,32 @@ async function readJsonBody(req: Request): Promise> { } async function ensureSchema(): Promise { - await sql` - CREATE TABLE IF NOT EXISTS decision_center_records ( - id TEXT PRIMARY KEY, - type TEXT NOT NULL, - level TEXT NOT NULL DEFAULT 'none', - status TEXT NOT NULL DEFAULT 'active', - title TEXT NOT NULL, - body TEXT NOT NULL DEFAULT '', - linked_goal_id TEXT, - tags JSONB NOT NULL DEFAULT '[]'::jsonb, - evidence_links JSONB NOT NULL DEFAULT '[]'::jsonb, - source_session TEXT NOT NULL DEFAULT '', - task_id TEXT NOT NULL DEFAULT '', - commit_id TEXT NOT NULL DEFAULT '', - created_at TIMESTAMPTZ NOT NULL DEFAULT now(), - updated_at TIMESTAMPTZ NOT NULL DEFAULT now(), - CONSTRAINT decision_center_records_type_check CHECK (type IN ('meeting', 'decision', 'goal', 'blocker', 'debt', 'experiment')), - CONSTRAINT decision_center_records_level_check CHECK (level IN ('G0', 'G1', 'G2', 'G3', 'P0', 'P1', 'P2', 'P3', 'none')), - CONSTRAINT decision_center_records_status_check CHECK (status IN ('active', 'blocked', 'parked', 'done')) - ) - `; - await sql`CREATE INDEX IF NOT EXISTS idx_decision_center_records_type_status_level ON decision_center_records(type, status, level)`; - await sql`CREATE INDEX IF NOT EXISTS idx_decision_center_records_linked_goal ON decision_center_records(linked_goal_id)`; - await sql`CREATE INDEX IF NOT EXISTS idx_decision_center_records_updated ON decision_center_records(updated_at DESC)`; + await withDatabaseRecovery("ensure_schema", async () => { + await sql` + CREATE TABLE IF NOT EXISTS decision_center_records ( + id TEXT PRIMARY KEY, + type TEXT NOT NULL, + level TEXT NOT NULL DEFAULT 'none', + status TEXT NOT NULL DEFAULT 'active', + title TEXT NOT NULL, + body TEXT NOT NULL DEFAULT '', + linked_goal_id TEXT, + tags JSONB NOT NULL DEFAULT '[]'::jsonb, + evidence_links JSONB NOT NULL DEFAULT '[]'::jsonb, + source_session TEXT NOT NULL DEFAULT '', + task_id TEXT NOT NULL DEFAULT '', + commit_id TEXT NOT NULL DEFAULT '', + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT now(), + CONSTRAINT decision_center_records_type_check CHECK (type IN ('meeting', 'decision', 'goal', 'blocker', 'debt', 'experiment')), + CONSTRAINT decision_center_records_level_check CHECK (level IN ('G0', 'G1', 'G2', 'G3', 'P0', 'P1', 'P2', 'P3', 'none')), + CONSTRAINT decision_center_records_status_check CHECK (status IN ('active', 'blocked', 'parked', 'done')) + ) + `; + await sql`CREATE INDEX IF NOT EXISTS idx_decision_center_records_type_status_level ON decision_center_records(type, status, level)`; + await sql`CREATE INDEX IF NOT EXISTS idx_decision_center_records_linked_goal ON decision_center_records(linked_goal_id)`; + await sql`CREATE INDEX IF NOT EXISTS idx_decision_center_records_updated ON decision_center_records(updated_at DESC)`; + }, { retryRead: true }); } async function waitForSchema(): Promise { @@ -298,18 +335,27 @@ async function refreshDatabaseHealth(): Promise { if (databaseHealthRefreshInFlight) return; databaseHealthRefreshInFlight = true; try { - const rows = await sql<{ count: string | number }[]>`SELECT count(*) AS count FROM decision_center_records`; + const rows = await withDatabaseRecovery("refresh_database_health", () => sql<{ count: string | number }[]>`SELECT count(*) AS count FROM decision_center_records`, { retryRead: true }); + const checkedAt = new Date().toISOString(); databaseHealth = { ok: true, recordCount: Number(rows[0]?.count ?? 0), - checkedAt: new Date().toISOString(), + checkedAt, + lastOkAt: checkedAt, + degraded: false, error: null, }; } catch (error) { + const checkedAt = new Date().toISOString(); + const lastOkAt = typeof databaseHealth.lastOkAt === "string" ? databaseHealth.lastOkAt : ""; + const lastOkMs = Date.parse(lastOkAt); + const withinGrace = Number.isFinite(lastOkMs) && Date.now() - lastOkMs < 5 * 60_000; databaseHealth = { - ok: false, - recordCount: 0, - checkedAt: new Date().toISOString(), + ok: withinGrace, + recordCount: Number(databaseHealth.recordCount ?? 0), + checkedAt, + lastOkAt, + degraded: withinGrace, error: errorToJson(error), }; } finally { @@ -348,7 +394,7 @@ async function createRecord(input: Record): Promise 240) throw new HttpError(400, "title must be at most 240 characters"); if (body.length > 300_000) throw new HttpError(400, "body must be at most 300000 characters"); const id = asString(input.id) || `dc_${randomUUID()}`; - const rows = await sql` + const rows = await withDatabaseRecovery("create_record", () => sql` INSERT INTO decision_center_records ( id, type, level, status, title, body, linked_goal_id, tags, evidence_links, source_session, task_id, commit_id ) VALUES ( @@ -366,14 +412,14 @@ async function createRecord(input: Record): Promise): Promise { const existing = await getRecord(id); - const rows = await sql` + const rows = await withDatabaseRecovery("update_record", () => sql` UPDATE decision_center_records SET type = ${"type" in input ? parseRecordType(input.type, existing.type) : existing.type}, @@ -390,12 +436,12 @@ async function updateRecord(id: string, input: Record): Promise updated_at = now() WHERE id = ${id} RETURNING * - `; + `); return recordFromRow(rows[0]!); } async function getRecord(id: string): Promise { - const rows = await sql`SELECT * FROM decision_center_records WHERE id = ${id}`; + const rows = await withDatabaseRecovery("get_record", () => sql`SELECT * FROM decision_center_records WHERE id = ${id}`, { retryRead: true }); if (rows.length === 0) throw new HttpError(404, "decision record not found", { id }); return recordFromRow(rows[0]!); } @@ -409,7 +455,7 @@ async function listRecords(url: URL): Promise { if (type && !recordTypes.has(type as DecisionRecordType)) throw new HttpError(400, "unsupported type filter", { type }); if (status && !recordStatuses.has(status as DecisionRecordStatus)) throw new HttpError(400, "unsupported status filter", { status }); if (level && !recordLevels.has(level as DecisionRecordLevel)) throw new HttpError(400, "unsupported level filter", { level }); - const rows = await sql` + const rows = await withDatabaseRecovery("list_records", () => sql` SELECT * FROM decision_center_records WHERE (${type || null}::text IS NULL OR type = ${type || null}) @@ -430,7 +476,7 @@ async function listRecords(url: URL): Promise { END ASC, updated_at DESC LIMIT ${limit} - `; + `, { retryRead: true }); return rows.map(recordFromRow); } @@ -476,7 +522,7 @@ async function importMeeting(input: Record): Promise { - const rows = await sql`DELETE FROM decision_center_records WHERE id = ${id} RETURNING *`; + const rows = await withDatabaseRecovery("delete_record", () => sql`DELETE FROM decision_center_records WHERE id = ${id} RETURNING *`); if (rows.length === 0) throw new HttpError(404, "decision record not found", { id }); log("info", "record_deleted", { id }); return { ok: true, deleted: recordFromRow(rows[0]!) };