"""Exchange service for business logic related to Bitcoin trading.""" import uuid from datetime import UTC, date, datetime, timedelta from sqlalchemy.exc import IntegrityError from sqlalchemy.ext.asyncio import AsyncSession from date_validation import validate_date_in_range from exceptions import ( BadRequestError, ConflictError, NotFoundError, ServiceUnavailableError, ) from models import ( Availability, BitcoinTransferMethod, Exchange, ExchangeStatus, PriceHistory, TradeDirection, User, ) from repositories.availability import AvailabilityRepository from repositories.exchange import ExchangeRepository from repositories.price import PriceRepository from schemas import AvailableSlotsResponse, BookableSlot from shared_constants import ( EUR_TRADE_INCREMENT, EUR_TRADE_MAX, EUR_TRADE_MIN, LIGHTNING_MAX_EUR, PREMIUM_PERCENTAGE, PRICE_STALENESS_SECONDS, SLOT_DURATION_MINUTES, ) # Constants for satoshi calculations SATS_PER_BTC = 100_000_000 class ExchangeService: """Service for exchange-related business logic.""" def __init__(self, db: AsyncSession): self.db = db self.price_repo = PriceRepository(db) self.exchange_repo = ExchangeRepository(db) self.availability_repo = AvailabilityRepository(db) def apply_premium_for_direction( self, market_price: float, premium_percentage: int, direction: TradeDirection, ) -> float: """ Apply premium to market price based on trade direction. The premium is always favorable to the admin: - When user BUYS BTC: user pays MORE (market * (1 + premium/100)) - When user SELLS BTC: user receives LESS (market * (1 - premium/100)) """ if direction == TradeDirection.BUY: return market_price * (1 + premium_percentage / 100) else: # SELL return market_price * (1 - premium_percentage / 100) def calculate_sats_amount( self, eur_cents: int, price_eur_per_btc: float, ) -> int: """ Calculate satoshi amount from EUR cents and price. Args: eur_cents: Amount in EUR cents (e.g., 10000 = €100) price_eur_per_btc: Price in EUR per BTC Returns: Amount in satoshis """ eur_amount = eur_cents / 100 btc_amount = eur_amount / price_eur_per_btc return int(btc_amount * SATS_PER_BTC) def is_price_stale(self, price_timestamp: datetime) -> bool: """Check if a price is older than the staleness threshold.""" age_seconds = (datetime.now(UTC) - price_timestamp).total_seconds() return age_seconds > PRICE_STALENESS_SECONDS async def get_latest_price(self) -> PriceHistory | None: """Get the most recent price from the database.""" return await self.price_repo.get_latest() async def validate_slot_timing(self, slot_start: datetime) -> None: """Validate slot timing - compute valid boundaries from slot duration.""" valid_minutes = tuple(range(0, 60, SLOT_DURATION_MINUTES)) if slot_start.minute not in valid_minutes: raise BadRequestError( f"Slot must be on {SLOT_DURATION_MINUTES}-minute boundary" ) if slot_start.second != 0 or slot_start.microsecond != 0: raise BadRequestError( "Slot start time must not have seconds or microseconds" ) async def validate_slot_availability( self, slot_start: datetime, slot_date: date ) -> None: """Verify slot falls within availability.""" from repositories.availability import AvailabilityRepository slot_start_time = slot_start.time() slot_end_dt = slot_start + timedelta(minutes=SLOT_DURATION_MINUTES) slot_end_time = slot_end_dt.time() availability_repo = AvailabilityRepository(self.db) availabilities = await availability_repo.get_by_date(slot_date) # Check if any availability block contains this slot matching_availability = None for avail in availabilities: if avail.start_time <= slot_start_time and avail.end_time >= slot_end_time: matching_availability = avail break if not matching_availability: slot_str = slot_start.strftime("%Y-%m-%d %H:%M") raise BadRequestError(f"Selected slot at {slot_str} UTC is not available") async def validate_price_not_stale(self) -> PriceHistory: """Validate price exists and is not stale.""" cached_price = await self.get_latest_price() if cached_price is None: raise ServiceUnavailableError( "Price data unavailable. Please try again later." ) if self.is_price_stale(cached_price.timestamp): raise ServiceUnavailableError( "Price is stale. Please refresh and try again." ) return cached_price async def validate_eur_amount(self, eur_amount: int) -> None: """Validate EUR amount is within configured limits.""" if eur_amount < EUR_TRADE_MIN * 100: raise BadRequestError(f"EUR amount must be at least €{EUR_TRADE_MIN}") if eur_amount > EUR_TRADE_MAX * 100: raise BadRequestError(f"EUR amount must be at most €{EUR_TRADE_MAX}") if eur_amount % (EUR_TRADE_INCREMENT * 100) != 0: raise BadRequestError( f"EUR amount must be a multiple of €{EUR_TRADE_INCREMENT}" ) async def validate_lightning_threshold( self, bitcoin_transfer_method: BitcoinTransferMethod, eur_amount: int ) -> None: """Validate Lightning threshold.""" if ( bitcoin_transfer_method == BitcoinTransferMethod.LIGHTNING and eur_amount > LIGHTNING_MAX_EUR * 100 ): raise BadRequestError( f"Lightning payments are only allowed for amounts up to " f"€{LIGHTNING_MAX_EUR}. For amounts above €{LIGHTNING_MAX_EUR}, " "please use onchain transactions." ) async def check_existing_trade_on_date( self, user: User, slot_date: date ) -> Exchange | None: """Check if user already has a trade on this date.""" exchanges = await self.exchange_repo.get_by_user_and_date_range( user_id=user.id, start_date=slot_date, end_date=slot_date, status=ExchangeStatus.BOOKED, ) return exchanges[0] if exchanges else None async def check_slot_already_booked(self, slot_start: datetime) -> Exchange | None: """Check if slot is already booked (only consider BOOKED status).""" return await self.exchange_repo.get_by_slot_start( slot_start, status=ExchangeStatus.BOOKED ) async def create_exchange( self, user: User, slot_start: datetime, direction: TradeDirection, bitcoin_transfer_method: BitcoinTransferMethod, eur_amount: int, ) -> Exchange: """ Create a new exchange trade booking with all business validation. Raises: BadRequestError: For validation failures ConflictError: If slot is already booked or user has trade on date ServiceUnavailableError: If price is unavailable or stale """ slot_date = slot_start.date() validate_date_in_range(slot_date, context="book") # Check if user already has a trade on this date existing_trade = await self.check_existing_trade_on_date(user, slot_date) if existing_trade: raise BadRequestError( f"You already have a trade booked on {slot_date.strftime('%Y-%m-%d')}. " f"Only one trade per day is allowed. " f"Trade ID: {existing_trade.public_id}" ) # Validate EUR amount await self.validate_eur_amount(eur_amount) # Validate Lightning threshold await self.validate_lightning_threshold(bitcoin_transfer_method, eur_amount) # Validate slot timing await self.validate_slot_timing(slot_start) # Verify slot falls within availability await self.validate_slot_availability(slot_start, slot_date) # Get and validate price cached_price = await self.validate_price_not_stale() # Calculate agreed price based on direction market_price = cached_price.price agreed_price = self.apply_premium_for_direction( market_price, PREMIUM_PERCENTAGE, direction ) # Calculate sats amount based on agreed price sats_amount = self.calculate_sats_amount(eur_amount, agreed_price) # Check if slot is already booked slot_booked = await self.check_slot_already_booked(slot_start) if slot_booked: slot_str = slot_start.strftime("%Y-%m-%d %H:%M") raise ConflictError( f"This slot at {slot_str} UTC has already been booked. " "Select another slot." ) # Create the exchange slot_end_dt = slot_start + timedelta(minutes=SLOT_DURATION_MINUTES) exchange = Exchange( user_id=user.id, slot_start=slot_start, slot_end=slot_end_dt, direction=direction, bitcoin_transfer_method=bitcoin_transfer_method, eur_amount=eur_amount, sats_amount=sats_amount, market_price_eur=market_price, agreed_price_eur=agreed_price, premium_percentage=PREMIUM_PERCENTAGE, status=ExchangeStatus.BOOKED, ) try: return await self.exchange_repo.create(exchange) except IntegrityError as e: await self.db.rollback() # This should rarely happen now since we check explicitly above, # but keep it for other potential integrity violations raise ConflictError( "Database constraint violation. Please try again." ) from e async def get_exchange_by_public_id( self, public_id: uuid.UUID, user: User | None = None ) -> Exchange: """ Get an exchange by public ID, optionally checking ownership. Raises: NotFoundError: If exchange not found or user doesn't own it (for security, returns 404) """ exchange = await self.exchange_repo.get_by_public_id(public_id) if not exchange: raise NotFoundError("Trade") # Check ownership if user is provided - return 404 for security # (prevents info leakage) if user and exchange.user_id != user.id: raise NotFoundError("Trade") return exchange async def cancel_exchange( self, exchange: Exchange, user: User, is_admin: bool = False ) -> Exchange: """ Cancel an exchange trade. Raises: BadRequestError: If cancellation is not allowed NotFoundError: If user doesn't own the exchange (when not admin, returns 404 for security) """ if not is_admin and exchange.user_id != user.id: raise NotFoundError("Trade") if exchange.status != ExchangeStatus.BOOKED: raise BadRequestError(f"Cannot cancel: status is '{exchange.status.value}'") if exchange.slot_start <= datetime.now(UTC): raise BadRequestError("Cannot cancel: trade slot time has already passed") exchange.status = ( ExchangeStatus.CANCELLED_BY_ADMIN if is_admin else ExchangeStatus.CANCELLED_BY_USER ) exchange.cancelled_at = datetime.now(UTC) return await self.exchange_repo.update(exchange) async def complete_exchange(self, exchange: Exchange) -> Exchange: """ Mark an exchange as completed. Raises: BadRequestError: If completion is not allowed """ if exchange.slot_start > datetime.now(UTC): raise BadRequestError("Cannot complete: trade slot has not yet started") if exchange.status != ExchangeStatus.BOOKED: raise BadRequestError( f"Cannot complete: status is '{exchange.status.value}'" ) exchange.status = ExchangeStatus.COMPLETED exchange.completed_at = datetime.now(UTC) return await self.exchange_repo.update(exchange) async def mark_no_show(self, exchange: Exchange) -> Exchange: """ Mark an exchange as no-show. Raises: BadRequestError: If marking as no-show is not allowed """ if exchange.slot_start > datetime.now(UTC): raise BadRequestError( "Cannot mark as no-show: trade slot has not yet started" ) if exchange.status != ExchangeStatus.BOOKED: raise BadRequestError( f"Cannot mark as no-show: status is '{exchange.status.value}'" ) exchange.status = ExchangeStatus.NO_SHOW exchange.completed_at = datetime.now(UTC) return await self.exchange_repo.update(exchange) def _expand_availability_to_slots( self, avail: Availability, slot_date: date, booked_starts: set[datetime] ) -> list[BookableSlot]: """ Expand an availability block into individual slots, filtering out booked ones. Args: avail: Availability record slot_date: Date for the slots booked_starts: Set of already-booked slot start times Returns: List of available BookableSlot records """ slots: list[BookableSlot] = [] # Start from the availability's start time current_start = datetime.combine(slot_date, avail.start_time, tzinfo=UTC) avail_end = datetime.combine(slot_date, avail.end_time, tzinfo=UTC) while current_start + timedelta(minutes=SLOT_DURATION_MINUTES) <= avail_end: slot_end = current_start + timedelta(minutes=SLOT_DURATION_MINUTES) # Only include if not already booked if current_start not in booked_starts: slots.append(BookableSlot(start_time=current_start, end_time=slot_end)) current_start = slot_end return slots async def get_available_slots(self, date_param: date) -> AvailableSlotsResponse: """ Get available booking slots for a specific date. Returns all slots that: - Fall within admin-defined availability windows - Are not already booked by another user Args: date_param: Date to get slots for Returns: AvailableSlotsResponse with date and list of available slots Raises: BadRequestError: If date is out of range """ validate_date_in_range(date_param, context="book") # Get availability for the date availabilities = await self.availability_repo.get_by_date(date_param) if not availabilities: return AvailableSlotsResponse(date=date_param, slots=[]) # Get already booked slots for the date booked_starts = await self.exchange_repo.get_booked_slots_for_date(date_param) # Expand each availability into slots all_slots: list[BookableSlot] = [] for avail in availabilities: slots = self._expand_availability_to_slots(avail, date_param, booked_starts) all_slots.extend(slots) # Sort by start time all_slots.sort(key=lambda s: s.start_time) return AvailableSlotsResponse(date=date_param, slots=all_slots)