Extract price logic to PriceService

- Create PriceService with get_recent_prices() and fetch_and_store_price()
- Update routes/audit.py to use PriceService instead of direct queries
- Use PriceHistoryMapper consistently
- Update test to patch services.price.fetch_btc_eur_price
This commit is contained in:
counterweight 2025-12-25 18:30:26 +01:00
parent 168b67acee
commit badb45da59
Signed by: counterweight
GPG key ID: 883EDBAA726BD96C
4 changed files with 324 additions and 50 deletions

View file

@ -1,36 +1,22 @@
"""Audit routes for price history."""
from fastapi import APIRouter, Depends
from sqlalchemy import desc, select
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.asyncio import AsyncSession
from auth import require_permission
from database import get_db
from models import Permission, PriceHistory, User
from price_fetcher import PAIR_BTC_EUR, SOURCE_BITFINEX, fetch_btc_eur_price
from mappers import PriceHistoryMapper
from models import Permission, User
from schemas import PriceHistoryResponse
from services.price import PriceService
router = APIRouter(prefix="/api/audit", tags=["audit"])
def _to_price_history_response(record: PriceHistory) -> PriceHistoryResponse:
return PriceHistoryResponse(
id=record.id,
source=record.source,
pair=record.pair,
price=record.price,
timestamp=record.timestamp,
created_at=record.created_at,
)
# =============================================================================
# Price History Endpoints
# =============================================================================
PRICE_HISTORY_LIMIT = 20
@router.get("/price-history", response_model=list[PriceHistoryResponse])
async def get_price_history(
@ -38,15 +24,10 @@ async def get_price_history(
_current_user: User = Depends(require_permission(Permission.VIEW_AUDIT)),
) -> list[PriceHistoryResponse]:
"""Get the 20 most recent price history records."""
query = (
select(PriceHistory)
.order_by(desc(PriceHistory.timestamp))
.limit(PRICE_HISTORY_LIMIT)
)
result = await db.execute(query)
records = result.scalars().all()
service = PriceService(db)
records = await service.get_recent_prices()
return [_to_price_history_response(record) for record in records]
return [PriceHistoryMapper.to_response(record) for record in records]
@router.post("/price-history/fetch", response_model=PriceHistoryResponse)
@ -55,28 +36,7 @@ async def fetch_price_now(
_current_user: User = Depends(require_permission(Permission.FETCH_PRICE)),
) -> PriceHistoryResponse:
"""Manually trigger a price fetch from Bitfinex."""
price, timestamp = await fetch_btc_eur_price()
service = PriceService(db)
record = await service.fetch_and_store_price()
record = PriceHistory(
source=SOURCE_BITFINEX,
pair=PAIR_BTC_EUR,
price=price,
timestamp=timestamp,
)
db.add(record)
try:
await db.commit()
await db.refresh(record)
except IntegrityError:
# Duplicate timestamp - return the existing record
await db.rollback()
query = select(PriceHistory).where(
PriceHistory.source == SOURCE_BITFINEX,
PriceHistory.pair == PAIR_BTC_EUR,
PriceHistory.timestamp == timestamp,
)
result = await db.execute(query)
record = result.scalar_one()
return _to_price_history_response(record)
return PriceHistoryMapper.to_response(record)

70
backend/services/price.py Normal file
View file

@ -0,0 +1,70 @@
"""Price service for fetching and managing price history."""
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.asyncio import AsyncSession
from exceptions import ConflictError
from models import PriceHistory
from price_fetcher import PAIR_BTC_EUR, SOURCE_BITFINEX, fetch_btc_eur_price
from repositories.price import PriceRepository
PRICE_HISTORY_LIMIT = 20
class PriceService:
"""Service for price-related business logic."""
def __init__(self, db: AsyncSession):
self.db = db
self.price_repo = PriceRepository(db)
async def get_recent_prices(
self, limit: int = PRICE_HISTORY_LIMIT
) -> list[PriceHistory]:
"""
Get recent price history records.
Args:
limit: Maximum number of records to return (default: 20)
Returns:
List of PriceHistory records, most recent first
"""
return await self.price_repo.get_recent(limit)
async def fetch_and_store_price(self) -> PriceHistory:
"""
Fetch price from Bitfinex and store it in the database.
Handles duplicate timestamp conflicts by returning the existing record.
Returns:
PriceHistory record (newly created or existing if duplicate)
Raises:
ConflictError: If unable to fetch or store price after retries
"""
price_value, timestamp = await fetch_btc_eur_price()
record = PriceHistory(
source=SOURCE_BITFINEX,
pair=PAIR_BTC_EUR,
price=price_value,
timestamp=timestamp,
)
self.db.add(record)
try:
await self.db.commit()
await self.db.refresh(record)
return record
except IntegrityError:
# Duplicate timestamp - return the existing record
await self.db.rollback()
existing_record = await self.price_repo.get_by_timestamp(
timestamp, SOURCE_BITFINEX, PAIR_BTC_EUR
)
if existing_record:
return existing_record
# This should not happen, but handle gracefully
raise ConflictError("Failed to fetch or store price") from None

View file

@ -280,7 +280,7 @@ class TestManualFetch:
existing_id = existing.id
# Mock fetch_btc_eur_price to return the same timestamp
with patch("routes.audit.fetch_btc_eur_price") as mock_fetch:
with patch("services.price.fetch_btc_eur_price") as mock_fetch:
mock_fetch.return_value = (95000.0, fixed_timestamp)
async with client_factory.create(cookies=admin_user["cookies"]) as authed: