feat: add scheduled Bitcoin price fetch job running every minute

This commit is contained in:
counterweight 2025-12-22 15:51:57 +01:00
parent 4e96b9ee28
commit cd2285395d
Signed by: counterweight
GPG key ID: 883EDBAA726BD96C

View file

@ -7,12 +7,14 @@ import random
import time import time
import asyncpg import asyncpg
from pgqueuer import Job, QueueManager from pgqueuer import Job, QueueManager, SchedulerManager
from pgqueuer.db import AsyncpgDriver from pgqueuer.db import AsyncpgDriver
from pgqueuer.models import Schedule
from pgqueuer.queries import Queries from pgqueuer.queries import Queries
from database import ASYNCPG_DATABASE_URL from database import ASYNCPG_DATABASE_URL
from jobs import JOB_RANDOM_NUMBER from jobs import JOB_FETCH_BITCOIN_PRICE, JOB_RANDOM_NUMBER
from price_fetcher import fetch_btc_eur_price
logging.basicConfig( logging.basicConfig(
level=logging.INFO, level=logging.INFO,
@ -96,28 +98,70 @@ def register_job_handlers(qm: QueueManager, db_pool: asyncpg.Pool) -> None:
await process_random_number_job(job, db_pool) await process_random_number_job(job, db_pool)
def register_scheduled_jobs(sm: SchedulerManager, db_pool: asyncpg.Pool) -> None:
"""Register all scheduled jobs with the scheduler manager."""
# Run every minute: "* * * * *" means every minute of every hour of every day
@sm.schedule(JOB_FETCH_BITCOIN_PRICE, "* * * * *")
async def fetch_bitcoin_price(schedule: Schedule) -> None:
"""Fetch Bitcoin price from Bitfinex every minute."""
try:
price, timestamp = await fetch_btc_eur_price()
async with db_pool.acquire() as conn:
await conn.execute(
"""
INSERT INTO price_history
(source, pair, price, timestamp, created_at)
VALUES ($1, $2, $3, $4, NOW())
ON CONFLICT (source, pair, timestamp) DO NOTHING
""",
"bitfinex",
"BTC/EUR",
price,
timestamp,
)
logger.info(f"Fetched BTC/EUR price: €{price:.2f}")
except Exception as e:
# Fail silently - next scheduled job will continue
logger.error(f"Failed to fetch Bitcoin price: {e}")
async def main() -> None: async def main() -> None:
"""Main worker entry point.""" """Main worker entry point."""
logger.info("Installing pgqueuer schema...") logger.info("Installing pgqueuer schema...")
await install_schema() await install_schema()
logger.info("Connecting to database...") logger.info("Connecting to database...")
# Connection for pgqueuer # Connection for queue manager
queue_conn = await asyncpg.connect(ASYNCPG_DATABASE_URL) queue_conn = await asyncpg.connect(ASYNCPG_DATABASE_URL)
# Connection for scheduler manager
scheduler_conn = await asyncpg.connect(ASYNCPG_DATABASE_URL)
# Connection pool for application data # Connection pool for application data
db_pool = await asyncpg.create_pool(ASYNCPG_DATABASE_URL, min_size=1, max_size=5) db_pool = await asyncpg.create_pool(ASYNCPG_DATABASE_URL, min_size=1, max_size=5)
try: try:
driver = AsyncpgDriver(queue_conn) # Setup queue manager for on-demand jobs
qm = QueueManager(driver) queue_driver = AsyncpgDriver(queue_conn)
qm = QueueManager(queue_driver)
# Register job handlers with access to db pool
register_job_handlers(qm, db_pool) register_job_handlers(qm, db_pool)
logger.info("Worker started, waiting for jobs...") # Setup scheduler manager for periodic jobs
await qm.run() scheduler_driver = AsyncpgDriver(scheduler_conn)
sm = SchedulerManager(scheduler_driver)
register_scheduled_jobs(sm, db_pool)
logger.info("Worker started, processing queue jobs and scheduled jobs...")
# Run both managers concurrently
await asyncio.gather(
qm.run(),
sm.run(),
)
finally: finally:
await queue_conn.close() await queue_conn.close()
await scheduler_conn.close()
await db_pool.close() await db_pool.close()
logger.info("Worker stopped") logger.info("Worker stopped")