arbret/backend/worker.py
counterweight de12300593
fix: use Python datetime for created_at in worker
Consistent with model default which uses datetime.now(UTC) rather than
SQL NOW() for created_at field.
2025-12-22 16:18:51 +01:00

185 lines
5.8 KiB
Python

"""Background job worker using pgqueuer."""
import asyncio
import json
import logging
import random
import time
from datetime import UTC, datetime
import asyncpg
from pgqueuer import Job, QueueManager, SchedulerManager
from pgqueuer.db import AsyncpgDriver
from pgqueuer.models import Schedule
from pgqueuer.queries import Queries
from database import ASYNCPG_DATABASE_URL
from jobs import JOB_RANDOM_NUMBER
from price_fetcher import PAIR_BTC_EUR, SOURCE_BITFINEX, fetch_btc_eur_price
# Scheduled job type (internal to worker, not enqueued externally)
JOB_FETCH_BITCOIN_PRICE = "fetch_bitcoin_price"
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger("worker")
async def install_schema() -> None:
"""Install pgqueuer schema if not already present."""
conn = await asyncpg.connect(ASYNCPG_DATABASE_URL)
try:
queries = Queries.from_asyncpg_connection(conn)
# Check if schema is already installed by looking for the main table
if not await queries.has_table("pgqueuer"):
await queries.install()
logger.info("pgqueuer schema installed")
else:
logger.info("pgqueuer schema already exists")
finally:
await conn.close()
async def process_random_number_job(job: Job, db_pool: asyncpg.Pool) -> None:
"""
Process a random number job.
- Parse user_id from payload
- Generate random number 0-100
- Record execution duration
- Store outcome in database
"""
start_time = time.perf_counter()
# Parse payload
payload_str = job.payload.decode() if job.payload else "{}"
try:
payload = json.loads(payload_str)
except json.JSONDecodeError as e:
logger.error(f"Job {job.id}: Invalid JSON payload: {e}")
return
user_id = payload.get("user_id")
if user_id is None:
logger.error(f"Job {job.id}: Missing user_id in payload")
return
# Generate random number
value = random.randint(0, 100)
# Calculate duration
duration_ms = int((time.perf_counter() - start_time) * 1000)
# Store outcome
async with db_pool.acquire() as conn:
await conn.execute(
"""
INSERT INTO random_number_outcomes
(job_id, triggered_by_user_id, value, duration_ms, status, created_at)
VALUES ($1, $2, $3, $4, $5, NOW())
""",
job.id,
user_id,
value,
duration_ms,
"completed",
)
logger.info(
f"Job {job.id}: Generated random number {value} for user {user_id} "
f"(duration: {duration_ms}ms)"
)
def register_job_handlers(qm: QueueManager, db_pool: asyncpg.Pool) -> None:
"""Register all job handlers with the queue manager."""
@qm.entrypoint(JOB_RANDOM_NUMBER)
async def handle_random_number(job: Job) -> None:
"""Handle random_number job entrypoint."""
await process_random_number_job(job, db_pool)
async def process_bitcoin_price_job(db_pool: asyncpg.Pool) -> None:
"""
Fetch and store Bitcoin price from Bitfinex.
This function is designed to fail silently - exceptions are caught and logged
so the scheduler can continue with the next scheduled run.
"""
try:
price, timestamp = await fetch_btc_eur_price()
async with db_pool.acquire() as conn:
await conn.execute(
"""
INSERT INTO price_history
(source, pair, price, timestamp, created_at)
VALUES ($1, $2, $3, $4, $5)
ON CONFLICT (source, pair, timestamp) DO NOTHING
""",
SOURCE_BITFINEX,
PAIR_BTC_EUR,
price,
timestamp,
datetime.now(UTC),
)
logger.info(f"Fetched BTC/EUR price: €{price:.2f}")
except Exception as e:
# Fail silently - next scheduled job will continue
logger.error(f"Failed to fetch Bitcoin price: {e}")
def register_scheduled_jobs(sm: SchedulerManager, db_pool: asyncpg.Pool) -> None:
"""Register all scheduled jobs with the scheduler manager."""
# Run every minute: "* * * * *" means every minute of every hour of every day
@sm.schedule(JOB_FETCH_BITCOIN_PRICE, "* * * * *")
async def fetch_bitcoin_price(schedule: Schedule) -> None:
"""Fetch Bitcoin price from Bitfinex every minute."""
await process_bitcoin_price_job(db_pool)
async def main() -> None:
"""Main worker entry point."""
logger.info("Installing pgqueuer schema...")
await install_schema()
logger.info("Connecting to database...")
# Connection for queue manager
queue_conn = await asyncpg.connect(ASYNCPG_DATABASE_URL)
# Connection for scheduler manager
scheduler_conn = await asyncpg.connect(ASYNCPG_DATABASE_URL)
# Connection pool for application data
db_pool = await asyncpg.create_pool(ASYNCPG_DATABASE_URL, min_size=1, max_size=5)
try:
# Setup queue manager for on-demand jobs
queue_driver = AsyncpgDriver(queue_conn)
qm = QueueManager(queue_driver)
register_job_handlers(qm, db_pool)
# Setup scheduler manager for periodic jobs
scheduler_driver = AsyncpgDriver(scheduler_conn)
sm = SchedulerManager(scheduler_driver)
register_scheduled_jobs(sm, db_pool)
logger.info("Worker started, processing queue jobs and scheduled jobs...")
# Run both managers concurrently
await asyncio.gather(
qm.run(),
sm.run(),
)
finally:
await queue_conn.close()
await scheduler_conn.close()
await db_pool.close()
logger.info("Worker stopped")
if __name__ == "__main__":
asyncio.run(main())