Unlike JOB_RANDOM_NUMBER which is used for external job enqueueing, JOB_FETCH_BITCOIN_PRICE is only used internally by the scheduler. Move it to worker.py to clarify it's not part of the public job API.
183 lines
5.7 KiB
Python
183 lines
5.7 KiB
Python
"""Background job worker using pgqueuer."""
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import random
|
|
import time
|
|
|
|
import asyncpg
|
|
from pgqueuer import Job, QueueManager, SchedulerManager
|
|
from pgqueuer.db import AsyncpgDriver
|
|
from pgqueuer.models import Schedule
|
|
from pgqueuer.queries import Queries
|
|
|
|
from database import ASYNCPG_DATABASE_URL
|
|
from jobs import JOB_RANDOM_NUMBER
|
|
from price_fetcher import PAIR_BTC_EUR, SOURCE_BITFINEX, fetch_btc_eur_price
|
|
|
|
# Scheduled job type (internal to worker, not enqueued externally)
|
|
JOB_FETCH_BITCOIN_PRICE = "fetch_bitcoin_price"
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
|
|
)
|
|
logger = logging.getLogger("worker")
|
|
|
|
|
|
async def install_schema() -> None:
|
|
"""Install pgqueuer schema if not already present."""
|
|
conn = await asyncpg.connect(ASYNCPG_DATABASE_URL)
|
|
try:
|
|
queries = Queries.from_asyncpg_connection(conn)
|
|
# Check if schema is already installed by looking for the main table
|
|
if not await queries.has_table("pgqueuer"):
|
|
await queries.install()
|
|
logger.info("pgqueuer schema installed")
|
|
else:
|
|
logger.info("pgqueuer schema already exists")
|
|
finally:
|
|
await conn.close()
|
|
|
|
|
|
async def process_random_number_job(job: Job, db_pool: asyncpg.Pool) -> None:
|
|
"""
|
|
Process a random number job.
|
|
|
|
- Parse user_id from payload
|
|
- Generate random number 0-100
|
|
- Record execution duration
|
|
- Store outcome in database
|
|
"""
|
|
start_time = time.perf_counter()
|
|
|
|
# Parse payload
|
|
payload_str = job.payload.decode() if job.payload else "{}"
|
|
try:
|
|
payload = json.loads(payload_str)
|
|
except json.JSONDecodeError as e:
|
|
logger.error(f"Job {job.id}: Invalid JSON payload: {e}")
|
|
return
|
|
|
|
user_id = payload.get("user_id")
|
|
if user_id is None:
|
|
logger.error(f"Job {job.id}: Missing user_id in payload")
|
|
return
|
|
|
|
# Generate random number
|
|
value = random.randint(0, 100)
|
|
|
|
# Calculate duration
|
|
duration_ms = int((time.perf_counter() - start_time) * 1000)
|
|
|
|
# Store outcome
|
|
async with db_pool.acquire() as conn:
|
|
await conn.execute(
|
|
"""
|
|
INSERT INTO random_number_outcomes
|
|
(job_id, triggered_by_user_id, value, duration_ms, status, created_at)
|
|
VALUES ($1, $2, $3, $4, $5, NOW())
|
|
""",
|
|
job.id,
|
|
user_id,
|
|
value,
|
|
duration_ms,
|
|
"completed",
|
|
)
|
|
|
|
logger.info(
|
|
f"Job {job.id}: Generated random number {value} for user {user_id} "
|
|
f"(duration: {duration_ms}ms)"
|
|
)
|
|
|
|
|
|
def register_job_handlers(qm: QueueManager, db_pool: asyncpg.Pool) -> None:
|
|
"""Register all job handlers with the queue manager."""
|
|
|
|
@qm.entrypoint(JOB_RANDOM_NUMBER)
|
|
async def handle_random_number(job: Job) -> None:
|
|
"""Handle random_number job entrypoint."""
|
|
await process_random_number_job(job, db_pool)
|
|
|
|
|
|
async def process_bitcoin_price_job(db_pool: asyncpg.Pool) -> None:
|
|
"""
|
|
Fetch and store Bitcoin price from Bitfinex.
|
|
|
|
This function is designed to fail silently - exceptions are caught and logged
|
|
so the scheduler can continue with the next scheduled run.
|
|
"""
|
|
try:
|
|
price, timestamp = await fetch_btc_eur_price()
|
|
|
|
async with db_pool.acquire() as conn:
|
|
await conn.execute(
|
|
"""
|
|
INSERT INTO price_history
|
|
(source, pair, price, timestamp, created_at)
|
|
VALUES ($1, $2, $3, $4, NOW())
|
|
ON CONFLICT (source, pair, timestamp) DO NOTHING
|
|
""",
|
|
SOURCE_BITFINEX,
|
|
PAIR_BTC_EUR,
|
|
price,
|
|
timestamp,
|
|
)
|
|
|
|
logger.info(f"Fetched BTC/EUR price: €{price:.2f}")
|
|
except Exception as e:
|
|
# Fail silently - next scheduled job will continue
|
|
logger.error(f"Failed to fetch Bitcoin price: {e}")
|
|
|
|
|
|
def register_scheduled_jobs(sm: SchedulerManager, db_pool: asyncpg.Pool) -> None:
|
|
"""Register all scheduled jobs with the scheduler manager."""
|
|
|
|
# Run every minute: "* * * * *" means every minute of every hour of every day
|
|
@sm.schedule(JOB_FETCH_BITCOIN_PRICE, "* * * * *")
|
|
async def fetch_bitcoin_price(schedule: Schedule) -> None:
|
|
"""Fetch Bitcoin price from Bitfinex every minute."""
|
|
await process_bitcoin_price_job(db_pool)
|
|
|
|
|
|
async def main() -> None:
|
|
"""Main worker entry point."""
|
|
logger.info("Installing pgqueuer schema...")
|
|
await install_schema()
|
|
|
|
logger.info("Connecting to database...")
|
|
# Connection for queue manager
|
|
queue_conn = await asyncpg.connect(ASYNCPG_DATABASE_URL)
|
|
# Connection for scheduler manager
|
|
scheduler_conn = await asyncpg.connect(ASYNCPG_DATABASE_URL)
|
|
# Connection pool for application data
|
|
db_pool = await asyncpg.create_pool(ASYNCPG_DATABASE_URL, min_size=1, max_size=5)
|
|
|
|
try:
|
|
# Setup queue manager for on-demand jobs
|
|
queue_driver = AsyncpgDriver(queue_conn)
|
|
qm = QueueManager(queue_driver)
|
|
register_job_handlers(qm, db_pool)
|
|
|
|
# Setup scheduler manager for periodic jobs
|
|
scheduler_driver = AsyncpgDriver(scheduler_conn)
|
|
sm = SchedulerManager(scheduler_driver)
|
|
register_scheduled_jobs(sm, db_pool)
|
|
|
|
logger.info("Worker started, processing queue jobs and scheduled jobs...")
|
|
|
|
# Run both managers concurrently
|
|
await asyncio.gather(
|
|
qm.run(),
|
|
sm.run(),
|
|
)
|
|
finally:
|
|
await queue_conn.close()
|
|
await scheduler_conn.close()
|
|
await db_pool.close()
|
|
logger.info("Worker stopped")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|