arbret/backend/worker.py
counterweight 1af0854d80
fix: improve worker task failure handling
Use asyncio.wait with FIRST_EXCEPTION to:
- Properly name tasks for better error logging
- Cancel remaining tasks when one fails
- Log which specific manager failed before propagating the exception
2025-12-22 16:24:40 +01:00

202 lines
6.4 KiB
Python

"""Background job worker using pgqueuer."""
import asyncio
import contextlib
import json
import logging
import random
import time
from datetime import UTC, datetime
import asyncpg
from pgqueuer import Job, QueueManager, SchedulerManager
from pgqueuer.db import AsyncpgDriver
from pgqueuer.models import Schedule
from pgqueuer.queries import Queries
from database import ASYNCPG_DATABASE_URL
from jobs import JOB_RANDOM_NUMBER
from price_fetcher import PAIR_BTC_EUR, SOURCE_BITFINEX, fetch_btc_eur_price
# Scheduled job type (internal to worker, not enqueued externally)
JOB_FETCH_BITCOIN_PRICE = "fetch_bitcoin_price"
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger("worker")
async def install_schema() -> None:
"""Install pgqueuer schema if not already present."""
conn = await asyncpg.connect(ASYNCPG_DATABASE_URL)
try:
queries = Queries.from_asyncpg_connection(conn)
# Check if schema is already installed by looking for the main table
if not await queries.has_table("pgqueuer"):
await queries.install()
logger.info("pgqueuer schema installed")
else:
logger.info("pgqueuer schema already exists")
finally:
await conn.close()
async def process_random_number_job(job: Job, db_pool: asyncpg.Pool) -> None:
"""
Process a random number job.
- Parse user_id from payload
- Generate random number 0-100
- Record execution duration
- Store outcome in database
"""
start_time = time.perf_counter()
# Parse payload
payload_str = job.payload.decode() if job.payload else "{}"
try:
payload = json.loads(payload_str)
except json.JSONDecodeError as e:
logger.error(f"Job {job.id}: Invalid JSON payload: {e}")
return
user_id = payload.get("user_id")
if user_id is None:
logger.error(f"Job {job.id}: Missing user_id in payload")
return
# Generate random number
value = random.randint(0, 100)
# Calculate duration
duration_ms = int((time.perf_counter() - start_time) * 1000)
# Store outcome
async with db_pool.acquire() as conn:
await conn.execute(
"""
INSERT INTO random_number_outcomes
(job_id, triggered_by_user_id, value, duration_ms, status, created_at)
VALUES ($1, $2, $3, $4, $5, NOW())
""",
job.id,
user_id,
value,
duration_ms,
"completed",
)
logger.info(
f"Job {job.id}: Generated random number {value} for user {user_id} "
f"(duration: {duration_ms}ms)"
)
def register_job_handlers(qm: QueueManager, db_pool: asyncpg.Pool) -> None:
"""Register all job handlers with the queue manager."""
@qm.entrypoint(JOB_RANDOM_NUMBER)
async def handle_random_number(job: Job) -> None:
"""Handle random_number job entrypoint."""
await process_random_number_job(job, db_pool)
async def process_bitcoin_price_job(db_pool: asyncpg.Pool) -> None:
"""
Fetch and store Bitcoin price from Bitfinex.
This function is designed to fail silently - exceptions are caught and logged
so the scheduler can continue with the next scheduled run.
"""
try:
price, timestamp = await fetch_btc_eur_price()
async with db_pool.acquire() as conn:
await conn.execute(
"""
INSERT INTO price_history
(source, pair, price, timestamp, created_at)
VALUES ($1, $2, $3, $4, $5)
ON CONFLICT (source, pair, timestamp) DO NOTHING
""",
SOURCE_BITFINEX,
PAIR_BTC_EUR,
price,
timestamp,
datetime.now(UTC),
)
logger.info(f"Fetched BTC/EUR price: €{price:.2f}")
except Exception as e:
# Fail silently - next scheduled job will continue
logger.error(f"Failed to fetch Bitcoin price: {e}")
def register_scheduled_jobs(sm: SchedulerManager, db_pool: asyncpg.Pool) -> None:
"""Register all scheduled jobs with the scheduler manager."""
# Run every minute: "* * * * *" means every minute of every hour of every day
@sm.schedule(JOB_FETCH_BITCOIN_PRICE, "* * * * *")
async def fetch_bitcoin_price(schedule: Schedule) -> None:
"""Fetch Bitcoin price from Bitfinex every minute."""
await process_bitcoin_price_job(db_pool)
async def main() -> None:
"""Main worker entry point."""
logger.info("Installing pgqueuer schema...")
await install_schema()
logger.info("Connecting to database...")
# Connection for queue manager
queue_conn = await asyncpg.connect(ASYNCPG_DATABASE_URL)
# Connection for scheduler manager
scheduler_conn = await asyncpg.connect(ASYNCPG_DATABASE_URL)
# Connection pool for application data
db_pool = await asyncpg.create_pool(ASYNCPG_DATABASE_URL, min_size=1, max_size=5)
try:
# Setup queue manager for on-demand jobs
queue_driver = AsyncpgDriver(queue_conn)
qm = QueueManager(queue_driver)
register_job_handlers(qm, db_pool)
# Setup scheduler manager for periodic jobs
scheduler_driver = AsyncpgDriver(scheduler_conn)
sm = SchedulerManager(scheduler_driver)
register_scheduled_jobs(sm, db_pool)
logger.info("Worker started, processing queue jobs and scheduled jobs...")
# Run both managers concurrently - if either fails, both stop
queue_task = asyncio.create_task(qm.run(), name="queue_manager")
scheduler_task = asyncio.create_task(sm.run(), name="scheduler_manager")
done, pending = await asyncio.wait(
[queue_task, scheduler_task],
return_when=asyncio.FIRST_EXCEPTION,
)
# Cancel any pending tasks
for task in pending:
task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await task
# Check for exceptions in completed tasks
for task in done:
exc = task.exception()
if exc is not None:
logger.error(f"Task '{task.get_name()}' failed: {exc}")
raise exc
finally:
await queue_conn.close()
await scheduler_conn.close()
await db_pool.close()
logger.info("Worker stopped")
if __name__ == "__main__":
asyncio.run(main())