"""Price repository for database queries.""" from datetime import datetime from sqlalchemy import desc, select from sqlalchemy.ext.asyncio import AsyncSession from models import PriceHistory from price_fetcher import PAIR_BTC_EUR, SOURCE_BITFINEX class PriceRepository: """Repository for price-related database queries.""" def __init__(self, db: AsyncSession): self.db = db async def get_latest( self, source: str = SOURCE_BITFINEX, pair: str = PAIR_BTC_EUR ) -> PriceHistory | None: """Get the most recent price from the database.""" query = ( select(PriceHistory) .where(PriceHistory.source == source, PriceHistory.pair == pair) .order_by(desc(PriceHistory.timestamp)) .limit(1) ) result = await self.db.execute(query) return result.scalar_one_or_none() async def get_recent(self, limit: int = 20) -> list[PriceHistory]: """Get the most recent price history records.""" query = select(PriceHistory).order_by(desc(PriceHistory.timestamp)).limit(limit) result = await self.db.execute(query) return list(result.scalars().all()) async def get_by_timestamp( self, timestamp: str | datetime, source: str = SOURCE_BITFINEX, pair: str = PAIR_BTC_EUR, ) -> PriceHistory | None: """Get a price record by timestamp.""" # Convert string timestamp to datetime if needed timestamp_dt: datetime if isinstance(timestamp, str): timestamp_dt = datetime.fromisoformat(timestamp.replace("Z", "+00:00")) else: timestamp_dt = timestamp result = await self.db.execute( select(PriceHistory).where( PriceHistory.source == source, PriceHistory.pair == pair, PriceHistory.timestamp == timestamp_dt, ) ) return result.scalar_one_or_none()