Phase 3: Outcome storage

- Add RandomNumberOutcome model to models.py
- Update worker.py to execute job logic:
  - Generate random number 0-100
  - Record execution duration
  - Store outcome in database
- Add test_jobs.py with unit tests for job handler logic
This commit is contained in:
counterweight 2025-12-21 22:50:35 +01:00
parent 6ca0ae88dd
commit 7beb213cf5
Signed by: counterweight
GPG key ID: 883EDBAA726BD96C
3 changed files with 257 additions and 10 deletions

View file

@ -1,8 +1,11 @@
"""Background job worker using pgqueuer."""
import asyncio
import json
import logging
import os
import random
import time
import asyncpg
from pgqueuer import Job, QueueManager
@ -37,14 +40,60 @@ async def install_schema() -> None:
await conn.close()
def register_job_handlers(qm: QueueManager) -> None:
async def process_random_number_job(job: Job, db_pool: asyncpg.Pool) -> None:
"""
Process a random number job.
- Parse user_id from payload
- Generate random number 0-100
- Record execution duration
- Store outcome in database
"""
start_time = time.perf_counter()
# Parse payload
payload_str = job.payload.decode() if job.payload else "{}"
payload = json.loads(payload_str)
user_id = payload.get("user_id")
if user_id is None:
logger.error(f"Job {job.id}: Missing user_id in payload")
return
# Generate random number
value = random.randint(0, 100)
# Calculate duration
duration_ms = int((time.perf_counter() - start_time) * 1000)
# Store outcome
async with db_pool.acquire() as conn:
await conn.execute(
"""
INSERT INTO random_number_outcomes
(job_id, triggered_by_user_id, value, duration_ms, status, created_at)
VALUES ($1, $2, $3, $4, $5, NOW())
""",
job.id,
user_id,
value,
duration_ms,
"completed",
)
logger.info(
f"Job {job.id}: Generated random number {value} for user {user_id} "
f"(duration: {duration_ms}ms)"
)
def register_job_handlers(qm: QueueManager, db_pool: asyncpg.Pool) -> None:
"""Register all job handlers with the queue manager."""
@qm.entrypoint("random_number")
async def process_random_number(job: Job) -> None:
"""Process a random number job (placeholder - just logs for now)."""
payload_str = job.payload.decode() if job.payload else ""
logger.info(f"Processing random_number job {job.id}: {payload_str}")
async def handle_random_number(job: Job) -> None:
"""Handle random_number job entrypoint."""
await process_random_number_job(job, db_pool)
async def main() -> None:
@ -53,19 +102,23 @@ async def main() -> None:
await install_schema()
logger.info("Connecting to database...")
conn = await asyncpg.connect(DATABASE_URL)
# Connection for pgqueuer
queue_conn = await asyncpg.connect(DATABASE_URL)
# Connection pool for application data
db_pool = await asyncpg.create_pool(DATABASE_URL, min_size=1, max_size=5)
try:
driver = AsyncpgDriver(conn)
driver = AsyncpgDriver(queue_conn)
qm = QueueManager(driver)
# Register job handlers
register_job_handlers(qm)
# Register job handlers with access to db pool
register_job_handlers(qm, db_pool)
logger.info("Worker started, waiting for jobs...")
await qm.run()
finally:
await conn.close()
await queue_conn.close()
await db_pool.close()
logger.info("Worker stopped")