diff --git a/backend/worker.py b/backend/worker.py index d025dcf..b5a2f91 100644 --- a/backend/worker.py +++ b/backend/worker.py @@ -1,6 +1,7 @@ """Background job worker using pgqueuer.""" import asyncio +import contextlib import json import logging import random @@ -169,11 +170,27 @@ async def main() -> None: logger.info("Worker started, processing queue jobs and scheduled jobs...") - # Run both managers concurrently - await asyncio.gather( - qm.run(), - sm.run(), + # Run both managers concurrently - if either fails, both stop + queue_task = asyncio.create_task(qm.run(), name="queue_manager") + scheduler_task = asyncio.create_task(sm.run(), name="scheduler_manager") + + done, pending = await asyncio.wait( + [queue_task, scheduler_task], + return_when=asyncio.FIRST_EXCEPTION, ) + + # Cancel any pending tasks + for task in pending: + task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await task + + # Check for exceptions in completed tasks + for task in done: + exc = task.exception() + if exc is not None: + logger.error(f"Task '{task.get_name()}' failed: {exc}") + raise exc finally: await queue_conn.close() await scheduler_conn.close()