From 1af0854d803d9a4d93e16347311f6e784a4f3cd6 Mon Sep 17 00:00:00 2001 From: counterweight Date: Mon, 22 Dec 2025 16:24:40 +0100 Subject: [PATCH] fix: improve worker task failure handling Use asyncio.wait with FIRST_EXCEPTION to: - Properly name tasks for better error logging - Cancel remaining tasks when one fails - Log which specific manager failed before propagating the exception --- backend/worker.py | 25 +++++++++++++++++++++---- 1 file changed, 21 insertions(+), 4 deletions(-) diff --git a/backend/worker.py b/backend/worker.py index d025dcf..b5a2f91 100644 --- a/backend/worker.py +++ b/backend/worker.py @@ -1,6 +1,7 @@ """Background job worker using pgqueuer.""" import asyncio +import contextlib import json import logging import random @@ -169,11 +170,27 @@ async def main() -> None: logger.info("Worker started, processing queue jobs and scheduled jobs...") - # Run both managers concurrently - await asyncio.gather( - qm.run(), - sm.run(), + # Run both managers concurrently - if either fails, both stop + queue_task = asyncio.create_task(qm.run(), name="queue_manager") + scheduler_task = asyncio.create_task(sm.run(), name="scheduler_manager") + + done, pending = await asyncio.wait( + [queue_task, scheduler_task], + return_when=asyncio.FIRST_EXCEPTION, ) + + # Cancel any pending tasks + for task in pending: + task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await task + + # Check for exceptions in completed tasks + for task in done: + exc = task.exception() + if exc is not None: + logger.error(f"Task '{task.get_name()}' failed: {exc}") + raise exc finally: await queue_conn.close() await scheduler_conn.close()