test: add unit tests for scheduled Bitcoin price job handler

This commit is contained in:
counterweight 2025-12-22 15:53:05 +01:00
parent cd2285395d
commit 9db43c474e
Signed by: counterweight
GPG key ID: 883EDBAA726BD96C
2 changed files with 121 additions and 21 deletions

View file

@ -1,5 +1,6 @@
"""Tests for price history feature.""" """Tests for price history feature."""
from contextlib import asynccontextmanager
from datetime import UTC, datetime from datetime import UTC, datetime
from unittest.mock import AsyncMock, MagicMock, patch from unittest.mock import AsyncMock, MagicMock, patch
@ -7,6 +8,7 @@ import pytest
from models import PriceHistory from models import PriceHistory
from price_fetcher import fetch_btc_eur_price from price_fetcher import fetch_btc_eur_price
from worker import process_bitcoin_price_job
class TestFetchBtcEurPrice: class TestFetchBtcEurPrice:
@ -238,3 +240,91 @@ class TestManualFetch:
data = response.json() data = response.json()
assert len(data) == 1 assert len(data) == 1
assert data[0]["price"] == 87654.32 assert data[0]["price"] == 87654.32
def create_mock_pool(mock_conn: AsyncMock) -> MagicMock:
"""Create a mock asyncpg pool with proper async context manager behavior."""
mock_pool = MagicMock()
@asynccontextmanager
async def mock_acquire():
yield mock_conn
mock_pool.acquire = mock_acquire
return mock_pool
class TestProcessBitcoinPriceJob:
"""Tests for the scheduled Bitcoin price job handler."""
@pytest.mark.asyncio
async def test_stores_price_on_success(self):
"""Verify price is stored in database on successful fetch."""
mock_response = MagicMock()
mock_response.json.return_value = [0, 0, 0, 0, 0, 0, 95000.0, 0, 0, 0]
mock_response.raise_for_status = MagicMock()
mock_http_client = AsyncMock()
mock_http_client.get.return_value = mock_response
mock_http_client.__aenter__.return_value = mock_http_client
mock_http_client.__aexit__.return_value = None
mock_conn = AsyncMock()
mock_pool = create_mock_pool(mock_conn)
with patch("price_fetcher.httpx.AsyncClient", return_value=mock_http_client):
await process_bitcoin_price_job(mock_pool)
# Verify execute was called with correct values
mock_conn.execute.assert_called_once()
call_args = mock_conn.execute.call_args
# Check the SQL parameters
assert call_args[0][1] == "bitfinex" # source
assert call_args[0][2] == "BTC/EUR" # pair
assert call_args[0][3] == 95000.0 # price
@pytest.mark.asyncio
async def test_fails_silently_on_api_error(self):
"""Verify no exception is raised and no DB insert on API error."""
import httpx
mock_response = MagicMock()
mock_response.raise_for_status.side_effect = httpx.HTTPStatusError(
"Server Error", request=MagicMock(), response=MagicMock()
)
mock_http_client = AsyncMock()
mock_http_client.get.return_value = mock_response
mock_http_client.__aenter__.return_value = mock_http_client
mock_http_client.__aexit__.return_value = None
mock_conn = AsyncMock()
mock_pool = create_mock_pool(mock_conn)
with patch("price_fetcher.httpx.AsyncClient", return_value=mock_http_client):
# Should not raise an exception
await process_bitcoin_price_job(mock_pool)
# Should not have called execute
mock_conn.execute.assert_not_called()
@pytest.mark.asyncio
async def test_fails_silently_on_db_error(self):
"""Verify no exception is raised on database error."""
mock_response = MagicMock()
mock_response.json.return_value = [0, 0, 0, 0, 0, 0, 95000.0, 0, 0, 0]
mock_response.raise_for_status = MagicMock()
mock_http_client = AsyncMock()
mock_http_client.get.return_value = mock_response
mock_http_client.__aenter__.return_value = mock_http_client
mock_http_client.__aexit__.return_value = None
mock_conn = AsyncMock()
mock_conn.execute.side_effect = Exception("Database connection error")
mock_pool = create_mock_pool(mock_conn)
with patch("price_fetcher.httpx.AsyncClient", return_value=mock_http_client):
# Should not raise an exception
await process_bitcoin_price_job(mock_pool)

View file

@ -98,13 +98,13 @@ def register_job_handlers(qm: QueueManager, db_pool: asyncpg.Pool) -> None:
await process_random_number_job(job, db_pool) await process_random_number_job(job, db_pool)
def register_scheduled_jobs(sm: SchedulerManager, db_pool: asyncpg.Pool) -> None: async def process_bitcoin_price_job(db_pool: asyncpg.Pool) -> None:
"""Register all scheduled jobs with the scheduler manager.""" """
Fetch and store Bitcoin price from Bitfinex.
# Run every minute: "* * * * *" means every minute of every hour of every day This function is designed to fail silently - exceptions are caught and logged
@sm.schedule(JOB_FETCH_BITCOIN_PRICE, "* * * * *") so the scheduler can continue with the next scheduled run.
async def fetch_bitcoin_price(schedule: Schedule) -> None: """
"""Fetch Bitcoin price from Bitfinex every minute."""
try: try:
price, timestamp = await fetch_btc_eur_price() price, timestamp = await fetch_btc_eur_price()
@ -128,6 +128,16 @@ def register_scheduled_jobs(sm: SchedulerManager, db_pool: asyncpg.Pool) -> None
logger.error(f"Failed to fetch Bitcoin price: {e}") logger.error(f"Failed to fetch Bitcoin price: {e}")
def register_scheduled_jobs(sm: SchedulerManager, db_pool: asyncpg.Pool) -> None:
"""Register all scheduled jobs with the scheduler manager."""
# Run every minute: "* * * * *" means every minute of every hour of every day
@sm.schedule(JOB_FETCH_BITCOIN_PRICE, "* * * * *")
async def fetch_bitcoin_price(schedule: Schedule) -> None:
"""Fetch Bitcoin price from Bitfinex every minute."""
await process_bitcoin_price_job(db_pool)
async def main() -> None: async def main() -> None:
"""Main worker entry point.""" """Main worker entry point."""
logger.info("Installing pgqueuer schema...") logger.info("Installing pgqueuer schema...")