fix: tolerate code queue database disconnects
This commit is contained in:
@@ -205,12 +205,18 @@ let persistTimer: ReturnType<typeof setTimeout> | null = null;
|
||||
let persistDirty = false;
|
||||
let shutdownRequested = false;
|
||||
let serviceReady = false;
|
||||
const sql: SqlClient = postgres(config.databaseUrl, {
|
||||
let sql: SqlClient = createSqlClient();
|
||||
let sqlRotationInFlight: Promise<void> | null = null;
|
||||
|
||||
function createSqlClient(): SqlClient {
|
||||
return postgres(config.databaseUrl, {
|
||||
max: config.databasePoolMax,
|
||||
idle_timeout: 20,
|
||||
connect_timeout: 10,
|
||||
connection: { application_name: "unidesk-code-queue" },
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
let databaseReady = false;
|
||||
let databaseLastError: string | null = null;
|
||||
let databaseFlushTimer: ReturnType<typeof setTimeout> | null = null;
|
||||
@@ -1120,6 +1126,54 @@ function errorToJson(error: unknown): JsonValue {
|
||||
return String(error);
|
||||
}
|
||||
|
||||
function transientDatabaseErrorMessage(error: unknown): string {
|
||||
const message = error instanceof Error ? error.message : String(error);
|
||||
const stack = error instanceof Error ? error.stack ?? "" : "";
|
||||
return `${message}\n${stack}`;
|
||||
}
|
||||
|
||||
function isTransientDatabaseError(error: unknown): boolean {
|
||||
const text = transientDatabaseErrorMessage(error);
|
||||
if (/deadlock detected/iu.test(text)) return true;
|
||||
const looksLikePostgresClient = /node_modules\/postgres|postgres\/src|d601-tcp-egress-gateway|:15432|unidesk_code_queue|set_config/iu.test(text);
|
||||
if (!looksLikePostgresClient) return false;
|
||||
return /CONNECTION_CLOSED|ECONNRESET|EPIPE|ETIMEDOUT|Connection terminated|socket connection was closed|socket\.write|null is not an object/iu.test(text);
|
||||
}
|
||||
|
||||
function transientDatabaseRetryDelayMs(attempt: number): number {
|
||||
return Math.min(2000, 150 * Math.max(1, attempt));
|
||||
}
|
||||
|
||||
async function rotateSqlClientAfterTransientError(reason: string, error: unknown): Promise<void> {
|
||||
databaseLastError = databaseErrorMessage(error);
|
||||
if (sqlRotationInFlight !== null) return sqlRotationInFlight;
|
||||
const previous = sql;
|
||||
sql = createSqlClient();
|
||||
sqlRotationInFlight = previous.end({ timeout: 1 })
|
||||
.catch((closeError) => logger("warn", "database_client_close_failed", { reason, error: errorToJson(closeError) }))
|
||||
.finally(() => {
|
||||
sqlRotationInFlight = null;
|
||||
});
|
||||
logger("warn", "database_client_rotated", { reason, error: errorToJson(error) });
|
||||
return sqlRotationInFlight;
|
||||
}
|
||||
|
||||
async function withTransientDatabaseRetries<T>(operation: string, action: () => Promise<T>, attempts = 3): Promise<T> {
|
||||
let lastError: unknown = null;
|
||||
for (let attempt = 1; attempt <= attempts; attempt += 1) {
|
||||
try {
|
||||
return await action();
|
||||
} catch (error) {
|
||||
lastError = error;
|
||||
if (!isTransientDatabaseError(error) || attempt >= attempts) throw error;
|
||||
logger("warn", "database_transient_retry", { operation, attempt, attempts, delayMs: transientDatabaseRetryDelayMs(attempt), error: errorToJson(error) });
|
||||
await rotateSqlClientAfterTransientError(operation, error);
|
||||
await Bun.sleep(transientDatabaseRetryDelayMs(attempt));
|
||||
}
|
||||
}
|
||||
throw lastError;
|
||||
}
|
||||
|
||||
function judgeFailureDetailsForOutput(judge: JudgeResult): string {
|
||||
const details = judge.failureDetails;
|
||||
if (details === undefined || details === null) return "";
|
||||
@@ -1550,10 +1604,10 @@ async function deleteTaskFromDatabase(taskId: string): Promise<void> {
|
||||
async function claimTaskInDatabase(task: QueueTask, expectedQueueId: string): Promise<boolean> {
|
||||
if (!databaseReady) return true;
|
||||
const timeout = `${config.databaseClaimTimeoutMs}ms`;
|
||||
const claimed = await sql.begin(async (client) => {
|
||||
const claimed = await withTransientDatabaseRetries("claim-task", async () => sql.begin(async (client) => {
|
||||
await client`SELECT set_config('statement_timeout', ${timeout}, true), set_config('lock_timeout', ${timeout}, true)`;
|
||||
return await upsertTaskToDatabase(client, task, { claimQueueId: expectedQueueId });
|
||||
});
|
||||
}));
|
||||
if (claimed) return true;
|
||||
const databaseTask = await loadTaskFromDatabase(task.id);
|
||||
if (databaseTask !== null) reconcileHotTaskFromDatabase(databaseTask);
|
||||
@@ -2020,24 +2074,26 @@ async function flushDirtyTasksToDatabase(force = false): Promise<void> {
|
||||
scheduleDatabaseFlush();
|
||||
return;
|
||||
}
|
||||
const ids = Array.from(dirtyDatabaseTaskIds);
|
||||
const queueIds = Array.from(dirtyDatabaseQueueIds);
|
||||
const ids = Array.from(dirtyDatabaseTaskIds).sort();
|
||||
const queueIds = Array.from(dirtyDatabaseQueueIds).sort();
|
||||
if (ids.length === 0 && queueIds.length === 0) return;
|
||||
dirtyDatabaseTaskIds.clear();
|
||||
dirtyDatabaseQueueIds.clear();
|
||||
databaseFlushInFlight = true;
|
||||
const rejectedTaskIds: string[] = [];
|
||||
let rejectedTaskIds: string[] = [];
|
||||
try {
|
||||
await sql.begin(async (client) => {
|
||||
rejectedTaskIds = await withTransientDatabaseRetries("flush-dirty-tasks", async () => sql.begin(async (client) => {
|
||||
const currentRejectedTaskIds: string[] = [];
|
||||
for (const id of queueIds) {
|
||||
const queue = state.queues.find((item) => item.id === id);
|
||||
if (queue !== undefined) await upsertQueueToDatabase(client, queue);
|
||||
}
|
||||
for (const id of ids) {
|
||||
const task = state.tasks.find((item) => item.id === id);
|
||||
if (task !== undefined && !await upsertTaskToDatabase(client, task)) rejectedTaskIds.push(id);
|
||||
if (task !== undefined && !await upsertTaskToDatabase(client, task)) currentRejectedTaskIds.push(id);
|
||||
}
|
||||
});
|
||||
return currentRejectedTaskIds;
|
||||
}));
|
||||
databaseLastError = null;
|
||||
} catch (error) {
|
||||
for (const id of ids) dirtyDatabaseTaskIds.add(id);
|
||||
@@ -3992,6 +4048,23 @@ function installShutdownHandlers(): void {
|
||||
};
|
||||
process.once("SIGTERM", stop);
|
||||
process.once("SIGINT", stop);
|
||||
process.on("uncaughtException", (error) => {
|
||||
if (isTransientDatabaseError(error)) {
|
||||
void rotateSqlClientAfterTransientError("uncaught-exception", error).catch((closeError) => logger("warn", "database_client_rotate_uncaught_failed", { error: errorToJson(closeError) }));
|
||||
logger("error", "transient_database_uncaught_exception_suppressed", { error: errorToJson(error) });
|
||||
return;
|
||||
}
|
||||
logger("error", "uncaught_exception", { error: errorToJson(error) });
|
||||
throw error;
|
||||
});
|
||||
process.on("unhandledRejection", (reason) => {
|
||||
if (isTransientDatabaseError(reason)) {
|
||||
void rotateSqlClientAfterTransientError("unhandled-rejection", reason).catch((error) => logger("warn", "database_client_rotate_rejection_failed", { error: errorToJson(error) }));
|
||||
logger("error", "transient_database_unhandled_rejection_suppressed", { error: errorToJson(reason) });
|
||||
return;
|
||||
}
|
||||
logger("error", "unhandled_rejection", { error: errorToJson(reason) });
|
||||
});
|
||||
}
|
||||
|
||||
setInterval(() => {
|
||||
|
||||
Reference in New Issue
Block a user