From 7beb213cf5b5e3d77af2bc5abb83e7c81451ef9e Mon Sep 17 00:00:00 2001 From: counterweight Date: Sun, 21 Dec 2025 22:50:35 +0100 Subject: [PATCH] Phase 3: Outcome storage - Add RandomNumberOutcome model to models.py - Update worker.py to execute job logic: - Generate random number 0-100 - Record execution duration - Store outcome in database - Add test_jobs.py with unit tests for job handler logic --- backend/models.py | 21 +++++ backend/tests/test_jobs.py | 173 +++++++++++++++++++++++++++++++++++++ backend/worker.py | 73 +++++++++++++--- 3 files changed, 257 insertions(+), 10 deletions(-) create mode 100644 backend/tests/test_jobs.py diff --git a/backend/models.py b/backend/models.py index a00f22f..0244a42 100644 --- a/backend/models.py +++ b/backend/models.py @@ -351,3 +351,24 @@ class Appointment(Base): cancelled_at: Mapped[datetime | None] = mapped_column( DateTime(timezone=True), nullable=True ) + + +class RandomNumberOutcome(Base): + """Outcome of a random number job execution.""" + + __tablename__ = "random_number_outcomes" + + id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) + job_id: Mapped[int] = mapped_column(Integer, nullable=False, index=True) + triggered_by_user_id: Mapped[int] = mapped_column( + Integer, ForeignKey("users.id"), nullable=False, index=True + ) + triggered_by: Mapped[User] = relationship( + "User", foreign_keys=[triggered_by_user_id], lazy="joined" + ) + value: Mapped[int] = mapped_column(Integer, nullable=False) + duration_ms: Mapped[int] = mapped_column(Integer, nullable=False) + status: Mapped[str] = mapped_column(String(20), nullable=False, default="completed") + created_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), default=lambda: datetime.now(UTC) + ) diff --git a/backend/tests/test_jobs.py b/backend/tests/test_jobs.py new file mode 100644 index 0000000..5b64a43 --- /dev/null +++ b/backend/tests/test_jobs.py @@ -0,0 +1,173 @@ +"""Tests for job handler logic.""" + +import json +from contextlib import asynccontextmanager +from unittest.mock import AsyncMock, MagicMock + +import pytest + + +def create_mock_pool(mock_conn: AsyncMock) -> MagicMock: + """Create a mock asyncpg pool with proper async context manager behavior.""" + mock_pool = MagicMock() + + @asynccontextmanager + async def mock_acquire(): + yield mock_conn + + mock_pool.acquire = mock_acquire + return mock_pool + + +class TestRandomNumberJobHandler: + """Tests for the random number job handler logic.""" + + @pytest.mark.asyncio + async def test_generates_random_number_in_range(self): + """Verify random number is in range [0, 100].""" + from worker import process_random_number_job + + # Create mock job + job = MagicMock() + job.id = 123 + job.payload = json.dumps({"user_id": 1}).encode() + + # Create mock db pool + mock_conn = AsyncMock() + mock_pool = create_mock_pool(mock_conn) + + # Run the job handler + await process_random_number_job(job, mock_pool) + + # Verify execute was called + mock_conn.execute.assert_called_once() + call_args = mock_conn.execute.call_args + + # Extract the value argument (position 3 in the args) + # Args: (query, job_id, user_id, value, duration_ms, status) + value = call_args[0][3] + + assert 0 <= value <= 100, f"Value {value} is not in range [0, 100]" + + @pytest.mark.asyncio + async def test_stores_correct_user_id(self): + """Verify the correct user_id is stored in the outcome.""" + from worker import process_random_number_job + + user_id = 42 + + job = MagicMock() + job.id = 123 + job.payload = json.dumps({"user_id": user_id}).encode() + + mock_conn = AsyncMock() + mock_pool = create_mock_pool(mock_conn) + + await process_random_number_job(job, mock_pool) + + mock_conn.execute.assert_called_once() + call_args = mock_conn.execute.call_args + + # Args: (query, job_id, user_id, value, duration_ms, status) + stored_user_id = call_args[0][2] + assert stored_user_id == user_id + + @pytest.mark.asyncio + async def test_stores_job_id(self): + """Verify the job_id is stored in the outcome.""" + from worker import process_random_number_job + + job_id = 456 + + job = MagicMock() + job.id = job_id + job.payload = json.dumps({"user_id": 1}).encode() + + mock_conn = AsyncMock() + mock_pool = create_mock_pool(mock_conn) + + await process_random_number_job(job, mock_pool) + + mock_conn.execute.assert_called_once() + call_args = mock_conn.execute.call_args + + # Args: (query, job_id, user_id, value, duration_ms, status) + stored_job_id = call_args[0][1] + assert stored_job_id == job_id + + @pytest.mark.asyncio + async def test_stores_status_completed(self): + """Verify the status is set to 'completed'.""" + from worker import process_random_number_job + + job = MagicMock() + job.id = 123 + job.payload = json.dumps({"user_id": 1}).encode() + + mock_conn = AsyncMock() + mock_pool = create_mock_pool(mock_conn) + + await process_random_number_job(job, mock_pool) + + mock_conn.execute.assert_called_once() + call_args = mock_conn.execute.call_args + + # Args: (query, job_id, user_id, value, duration_ms, status) + status = call_args[0][5] + assert status == "completed" + + @pytest.mark.asyncio + async def test_records_duration_ms(self): + """Verify duration_ms is recorded (should be >= 0).""" + from worker import process_random_number_job + + job = MagicMock() + job.id = 123 + job.payload = json.dumps({"user_id": 1}).encode() + + mock_conn = AsyncMock() + mock_pool = create_mock_pool(mock_conn) + + await process_random_number_job(job, mock_pool) + + mock_conn.execute.assert_called_once() + call_args = mock_conn.execute.call_args + + # Args: (query, job_id, user_id, value, duration_ms, status) + duration_ms = call_args[0][4] + assert isinstance(duration_ms, int) + assert duration_ms >= 0 + + @pytest.mark.asyncio + async def test_missing_user_id_does_not_insert(self): + """Verify no insert happens if user_id is missing from payload.""" + from worker import process_random_number_job + + job = MagicMock() + job.id = 123 + job.payload = json.dumps({}).encode() # Missing user_id + + mock_conn = AsyncMock() + mock_pool = create_mock_pool(mock_conn) + + await process_random_number_job(job, mock_pool) + + # Should not have called execute + mock_conn.execute.assert_not_called() + + @pytest.mark.asyncio + async def test_empty_payload_does_not_insert(self): + """Verify no insert happens with empty payload.""" + from worker import process_random_number_job + + job = MagicMock() + job.id = 123 + job.payload = None + + mock_conn = AsyncMock() + mock_pool = create_mock_pool(mock_conn) + + await process_random_number_job(job, mock_pool) + + # Should not have called execute + mock_conn.execute.assert_not_called() diff --git a/backend/worker.py b/backend/worker.py index 68c2bb5..34e6d7d 100644 --- a/backend/worker.py +++ b/backend/worker.py @@ -1,8 +1,11 @@ """Background job worker using pgqueuer.""" import asyncio +import json import logging import os +import random +import time import asyncpg from pgqueuer import Job, QueueManager @@ -37,14 +40,60 @@ async def install_schema() -> None: await conn.close() -def register_job_handlers(qm: QueueManager) -> None: +async def process_random_number_job(job: Job, db_pool: asyncpg.Pool) -> None: + """ + Process a random number job. + + - Parse user_id from payload + - Generate random number 0-100 + - Record execution duration + - Store outcome in database + """ + start_time = time.perf_counter() + + # Parse payload + payload_str = job.payload.decode() if job.payload else "{}" + payload = json.loads(payload_str) + user_id = payload.get("user_id") + + if user_id is None: + logger.error(f"Job {job.id}: Missing user_id in payload") + return + + # Generate random number + value = random.randint(0, 100) + + # Calculate duration + duration_ms = int((time.perf_counter() - start_time) * 1000) + + # Store outcome + async with db_pool.acquire() as conn: + await conn.execute( + """ + INSERT INTO random_number_outcomes + (job_id, triggered_by_user_id, value, duration_ms, status, created_at) + VALUES ($1, $2, $3, $4, $5, NOW()) + """, + job.id, + user_id, + value, + duration_ms, + "completed", + ) + + logger.info( + f"Job {job.id}: Generated random number {value} for user {user_id} " + f"(duration: {duration_ms}ms)" + ) + + +def register_job_handlers(qm: QueueManager, db_pool: asyncpg.Pool) -> None: """Register all job handlers with the queue manager.""" @qm.entrypoint("random_number") - async def process_random_number(job: Job) -> None: - """Process a random number job (placeholder - just logs for now).""" - payload_str = job.payload.decode() if job.payload else "" - logger.info(f"Processing random_number job {job.id}: {payload_str}") + async def handle_random_number(job: Job) -> None: + """Handle random_number job entrypoint.""" + await process_random_number_job(job, db_pool) async def main() -> None: @@ -53,19 +102,23 @@ async def main() -> None: await install_schema() logger.info("Connecting to database...") - conn = await asyncpg.connect(DATABASE_URL) + # Connection for pgqueuer + queue_conn = await asyncpg.connect(DATABASE_URL) + # Connection pool for application data + db_pool = await asyncpg.create_pool(DATABASE_URL, min_size=1, max_size=5) try: - driver = AsyncpgDriver(conn) + driver = AsyncpgDriver(queue_conn) qm = QueueManager(driver) - # Register job handlers - register_job_handlers(qm) + # Register job handlers with access to db pool + register_job_handlers(qm, db_pool) logger.info("Worker started, waiting for jobs...") await qm.run() finally: - await conn.close() + await queue_conn.close() + await db_pool.close() logger.info("Worker stopped")