diff --git a/backend/worker.py b/backend/worker.py index d3069e4..4ebc123 100644 --- a/backend/worker.py +++ b/backend/worker.py @@ -7,12 +7,14 @@ import random import time import asyncpg -from pgqueuer import Job, QueueManager +from pgqueuer import Job, QueueManager, SchedulerManager from pgqueuer.db import AsyncpgDriver +from pgqueuer.models import Schedule from pgqueuer.queries import Queries from database import ASYNCPG_DATABASE_URL -from jobs import JOB_RANDOM_NUMBER +from jobs import JOB_FETCH_BITCOIN_PRICE, JOB_RANDOM_NUMBER +from price_fetcher import fetch_btc_eur_price logging.basicConfig( level=logging.INFO, @@ -96,28 +98,70 @@ def register_job_handlers(qm: QueueManager, db_pool: asyncpg.Pool) -> None: await process_random_number_job(job, db_pool) +def register_scheduled_jobs(sm: SchedulerManager, db_pool: asyncpg.Pool) -> None: + """Register all scheduled jobs with the scheduler manager.""" + + # Run every minute: "* * * * *" means every minute of every hour of every day + @sm.schedule(JOB_FETCH_BITCOIN_PRICE, "* * * * *") + async def fetch_bitcoin_price(schedule: Schedule) -> None: + """Fetch Bitcoin price from Bitfinex every minute.""" + try: + price, timestamp = await fetch_btc_eur_price() + + async with db_pool.acquire() as conn: + await conn.execute( + """ + INSERT INTO price_history + (source, pair, price, timestamp, created_at) + VALUES ($1, $2, $3, $4, NOW()) + ON CONFLICT (source, pair, timestamp) DO NOTHING + """, + "bitfinex", + "BTC/EUR", + price, + timestamp, + ) + + logger.info(f"Fetched BTC/EUR price: €{price:.2f}") + except Exception as e: + # Fail silently - next scheduled job will continue + logger.error(f"Failed to fetch Bitcoin price: {e}") + + async def main() -> None: """Main worker entry point.""" logger.info("Installing pgqueuer schema...") await install_schema() logger.info("Connecting to database...") - # Connection for pgqueuer + # Connection for queue manager queue_conn = await asyncpg.connect(ASYNCPG_DATABASE_URL) + # Connection for scheduler manager + scheduler_conn = await asyncpg.connect(ASYNCPG_DATABASE_URL) # Connection pool for application data db_pool = await asyncpg.create_pool(ASYNCPG_DATABASE_URL, min_size=1, max_size=5) try: - driver = AsyncpgDriver(queue_conn) - qm = QueueManager(driver) - - # Register job handlers with access to db pool + # Setup queue manager for on-demand jobs + queue_driver = AsyncpgDriver(queue_conn) + qm = QueueManager(queue_driver) register_job_handlers(qm, db_pool) - logger.info("Worker started, waiting for jobs...") - await qm.run() + # Setup scheduler manager for periodic jobs + scheduler_driver = AsyncpgDriver(scheduler_conn) + sm = SchedulerManager(scheduler_driver) + register_scheduled_jobs(sm, db_pool) + + logger.info("Worker started, processing queue jobs and scheduled jobs...") + + # Run both managers concurrently + await asyncio.gather( + qm.run(), + sm.run(), + ) finally: await queue_conn.close() + await scheduler_conn.close() await db_pool.close() logger.info("Worker stopped")