"""Background job worker using pgqueuer.""" import asyncio import json import logging import random import time import asyncpg from pgqueuer import Job, QueueManager from pgqueuer.db import AsyncpgDriver from pgqueuer.queries import Queries from database import ASYNCPG_DATABASE_URL logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", ) logger = logging.getLogger("worker") async def install_schema() -> None: """Install pgqueuer schema if not already present.""" conn = await asyncpg.connect(ASYNCPG_DATABASE_URL) try: queries = Queries.from_asyncpg_connection(conn) # Check if schema is already installed by looking for the main table if not await queries.has_table("pgqueuer"): await queries.install() logger.info("pgqueuer schema installed") else: logger.info("pgqueuer schema already exists") finally: await conn.close() async def process_random_number_job(job: Job, db_pool: asyncpg.Pool) -> None: """ Process a random number job. - Parse user_id from payload - Generate random number 0-100 - Record execution duration - Store outcome in database """ start_time = time.perf_counter() # Parse payload payload_str = job.payload.decode() if job.payload else "{}" payload = json.loads(payload_str) user_id = payload.get("user_id") if user_id is None: logger.error(f"Job {job.id}: Missing user_id in payload") return # Generate random number value = random.randint(0, 100) # Calculate duration duration_ms = int((time.perf_counter() - start_time) * 1000) # Store outcome async with db_pool.acquire() as conn: await conn.execute( """ INSERT INTO random_number_outcomes (job_id, triggered_by_user_id, value, duration_ms, status, created_at) VALUES ($1, $2, $3, $4, $5, NOW()) """, job.id, user_id, value, duration_ms, "completed", ) logger.info( f"Job {job.id}: Generated random number {value} for user {user_id} " f"(duration: {duration_ms}ms)" ) def register_job_handlers(qm: QueueManager, db_pool: asyncpg.Pool) -> None: """Register all job handlers with the queue manager.""" @qm.entrypoint("random_number") async def handle_random_number(job: Job) -> None: """Handle random_number job entrypoint.""" await process_random_number_job(job, db_pool) async def main() -> None: """Main worker entry point.""" logger.info("Installing pgqueuer schema...") await install_schema() logger.info("Connecting to database...") # Connection for pgqueuer queue_conn = await asyncpg.connect(ASYNCPG_DATABASE_URL) # Connection pool for application data db_pool = await asyncpg.create_pool(ASYNCPG_DATABASE_URL, min_size=1, max_size=5) try: driver = AsyncpgDriver(queue_conn) qm = QueueManager(driver) # Register job handlers with access to db pool register_job_handlers(qm, db_pool) logger.info("Worker started, waiting for jobs...") await qm.run() finally: await queue_conn.close() await db_pool.close() logger.info("Worker stopped") if __name__ == "__main__": asyncio.run(main())