"""Exchange routes for Bitcoin trading.""" from datetime import UTC, date, datetime, time, timedelta from fastapi import APIRouter, Depends, HTTPException from pydantic import BaseModel from sqlalchemy import and_, desc, select from sqlalchemy.exc import IntegrityError from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import joinedload from auth import require_permission from database import get_db from date_validation import validate_date_in_range from models import ( Availability, Exchange, ExchangeStatus, Permission, PriceHistory, TradeDirection, User, ) from price_fetcher import PAIR_BTC_EUR, SOURCE_BITFINEX, fetch_btc_eur_price from schemas import ( AdminExchangeResponse, ExchangeRequest, ExchangeResponse, ExchangeUserContact, ) from shared_constants import ( EUR_TRADE_INCREMENT, EUR_TRADE_MAX, EUR_TRADE_MIN, PREMIUM_PERCENTAGE, PRICE_STALENESS_SECONDS, SLOT_DURATION_MINUTES, ) router = APIRouter(prefix="/api/exchange", tags=["exchange"]) # ============================================================================= # Constants for satoshi calculations # ============================================================================= SATS_PER_BTC = 100_000_000 # ============================================================================= # Pydantic models for price endpoint # ============================================================================= class ExchangeConfigResponse(BaseModel): """Exchange configuration for the frontend.""" eur_min: int eur_max: int eur_increment: int premium_percentage: int class PriceResponse(BaseModel): """Current BTC/EUR price with premium applied.""" market_price: float # Raw price from exchange agreed_price: float # Price with premium applied premium_percentage: int timestamp: datetime is_stale: bool class ExchangePriceResponse(BaseModel): """Combined price and configuration response.""" price: PriceResponse | None # None if price fetch failed config: ExchangeConfigResponse error: str | None = None # ============================================================================= # Helper functions # ============================================================================= def apply_premium_for_direction( market_price: float, premium_percentage: int, direction: TradeDirection, ) -> float: """ Apply premium to market price based on trade direction. The premium is always favorable to the admin: - When user BUYS BTC: user pays MORE (market * (1 + premium/100)) - When user SELLS BTC: user receives LESS (market * (1 - premium/100)) """ if direction == TradeDirection.BUY: return market_price * (1 + premium_percentage / 100) else: # SELL return market_price * (1 - premium_percentage / 100) def apply_premium(market_price: float, premium_percentage: int) -> float: """Apply buy-side premium (for price display).""" return apply_premium_for_direction( market_price, premium_percentage, TradeDirection.BUY ) def calculate_sats_amount( eur_cents: int, price_eur_per_btc: float, ) -> int: """ Calculate satoshi amount from EUR cents and price. Args: eur_cents: Amount in EUR cents (e.g., 10000 = €100) price_eur_per_btc: Price in EUR per BTC Returns: Amount in satoshis """ eur_amount = eur_cents / 100 btc_amount = eur_amount / price_eur_per_btc return int(btc_amount * SATS_PER_BTC) async def get_latest_price(db: AsyncSession) -> PriceHistory | None: """Get the most recent price from the database.""" query = ( select(PriceHistory) .where( PriceHistory.source == SOURCE_BITFINEX, PriceHistory.pair == PAIR_BTC_EUR ) .order_by(desc(PriceHistory.timestamp)) .limit(1) ) result = await db.execute(query) return result.scalar_one_or_none() def is_price_stale(price_timestamp: datetime) -> bool: """Check if a price is older than the staleness threshold.""" age_seconds = (datetime.now(UTC) - price_timestamp).total_seconds() return age_seconds > PRICE_STALENESS_SECONDS def _to_exchange_response( exchange: Exchange, user_email: str | None = None, ) -> ExchangeResponse: """Convert an Exchange model to ExchangeResponse schema.""" email = user_email if user_email is not None else exchange.user.email return ExchangeResponse( id=exchange.id, user_id=exchange.user_id, user_email=email, slot_start=exchange.slot_start, slot_end=exchange.slot_end, direction=exchange.direction.value, eur_amount=exchange.eur_amount, sats_amount=exchange.sats_amount, market_price_eur=exchange.market_price_eur, agreed_price_eur=exchange.agreed_price_eur, premium_percentage=exchange.premium_percentage, status=exchange.status.value, created_at=exchange.created_at, cancelled_at=exchange.cancelled_at, completed_at=exchange.completed_at, ) # ============================================================================= # Price Endpoint # ============================================================================= @router.get("/price", response_model=ExchangePriceResponse) async def get_exchange_price( db: AsyncSession = Depends(get_db), _current_user: User = Depends(require_permission(Permission.CREATE_EXCHANGE)), ) -> ExchangePriceResponse: """ Get the current BTC/EUR price for trading. Returns the latest price from the database. If no price exists or the price is stale, attempts to fetch a fresh price from Bitfinex. The response includes: - market_price: The raw price from the exchange - agreed_price: The price with admin premium applied - is_stale: Whether the price is older than 5 minutes - config: Trading configuration (min/max EUR, increment) """ config = ExchangeConfigResponse( eur_min=EUR_TRADE_MIN, eur_max=EUR_TRADE_MAX, eur_increment=EUR_TRADE_INCREMENT, premium_percentage=PREMIUM_PERCENTAGE, ) # Try to get the latest cached price cached_price = await get_latest_price(db) # If no cached price or it's stale, try to fetch a new one if cached_price is None or is_price_stale(cached_price.timestamp): try: price_value, timestamp = await fetch_btc_eur_price() # Store the new price new_price = PriceHistory( source=SOURCE_BITFINEX, pair=PAIR_BTC_EUR, price=price_value, timestamp=timestamp, ) db.add(new_price) await db.commit() await db.refresh(new_price) return ExchangePriceResponse( price=PriceResponse( market_price=price_value, agreed_price=apply_premium(price_value, PREMIUM_PERCENTAGE), premium_percentage=PREMIUM_PERCENTAGE, timestamp=timestamp, is_stale=False, ), config=config, ) except Exception as e: # If fetch fails and we have a cached price, return it with stale flag if cached_price is not None: return ExchangePriceResponse( price=PriceResponse( market_price=cached_price.price, agreed_price=apply_premium( cached_price.price, PREMIUM_PERCENTAGE ), premium_percentage=PREMIUM_PERCENTAGE, timestamp=cached_price.timestamp, is_stale=True, ), config=config, error=f"Failed to fetch fresh price: {e}", ) # No cached price and fetch failed return ExchangePriceResponse( price=None, config=config, error=f"Price unavailable: {e}", ) # Return the cached price (not stale) return ExchangePriceResponse( price=PriceResponse( market_price=cached_price.price, agreed_price=apply_premium(cached_price.price, PREMIUM_PERCENTAGE), premium_percentage=PREMIUM_PERCENTAGE, timestamp=cached_price.timestamp, is_stale=is_price_stale(cached_price.timestamp), ), config=config, ) # ============================================================================= # Create Exchange Endpoint # ============================================================================= @router.post("", response_model=ExchangeResponse) async def create_exchange( request: ExchangeRequest, db: AsyncSession = Depends(get_db), current_user: User = Depends(require_permission(Permission.CREATE_EXCHANGE)), ) -> ExchangeResponse: """ Create a new exchange trade booking. Validates: - Slot is on a valid date and time boundary - Slot is within admin availability - Slot is not already booked - Price is not stale - EUR amount is within configured limits """ slot_date = request.slot_start.date() validate_date_in_range(slot_date, context="book") # Validate direction try: direction = TradeDirection(request.direction) except ValueError: raise HTTPException( status_code=400, detail=f"Invalid direction: {request.direction}. Must be 'buy' or 'sell'.", ) from None # Validate EUR amount if request.eur_amount < EUR_TRADE_MIN * 100: raise HTTPException( status_code=400, detail=f"EUR amount must be at least €{EUR_TRADE_MIN}", ) if request.eur_amount > EUR_TRADE_MAX * 100: raise HTTPException( status_code=400, detail=f"EUR amount must be at most €{EUR_TRADE_MAX}", ) if request.eur_amount % (EUR_TRADE_INCREMENT * 100) != 0: raise HTTPException( status_code=400, detail=f"EUR amount must be a multiple of €{EUR_TRADE_INCREMENT}", ) # Validate slot timing valid_minutes = (0, 15, 30, 45) if request.slot_start.minute not in valid_minutes: raise HTTPException( status_code=400, detail=f"Slot must be on {SLOT_DURATION_MINUTES}-minute boundary", ) if request.slot_start.second != 0 or request.slot_start.microsecond != 0: raise HTTPException( status_code=400, detail="Slot start time must not have seconds or microseconds", ) # Verify slot falls within availability slot_start_time = request.slot_start.time() slot_end_dt = request.slot_start + timedelta(minutes=SLOT_DURATION_MINUTES) slot_end_time = slot_end_dt.time() result = await db.execute( select(Availability).where( and_( Availability.date == slot_date, Availability.start_time <= slot_start_time, Availability.end_time >= slot_end_time, ) ) ) matching_availability = result.scalar_one_or_none() if not matching_availability: slot_str = request.slot_start.strftime("%Y-%m-%d %H:%M") raise HTTPException( status_code=400, detail=f"Selected slot at {slot_str} UTC is not available", ) # Get and validate price cached_price = await get_latest_price(db) if cached_price is None: raise HTTPException( status_code=503, detail="Price data unavailable. Please try again later.", ) if is_price_stale(cached_price.timestamp): raise HTTPException( status_code=503, detail="Price is stale. Please refresh and try again.", ) # Calculate agreed price based on direction market_price = cached_price.price agreed_price = apply_premium_for_direction( market_price, PREMIUM_PERCENTAGE, direction ) # Calculate sats amount based on agreed price sats_amount = calculate_sats_amount(request.eur_amount, agreed_price) # Create the exchange exchange = Exchange( user_id=current_user.id, slot_start=request.slot_start, slot_end=slot_end_dt, direction=direction, eur_amount=request.eur_amount, sats_amount=sats_amount, market_price_eur=market_price, agreed_price_eur=agreed_price, premium_percentage=PREMIUM_PERCENTAGE, status=ExchangeStatus.BOOKED, ) db.add(exchange) try: await db.commit() await db.refresh(exchange) except IntegrityError: await db.rollback() raise HTTPException( status_code=409, detail="This slot has already been booked. Select another slot.", ) from None return _to_exchange_response(exchange, current_user.email) # ============================================================================= # User's Exchanges Endpoints # ============================================================================= trades_router = APIRouter(prefix="/api/trades", tags=["trades"]) @trades_router.get("", response_model=list[ExchangeResponse]) async def get_my_trades( db: AsyncSession = Depends(get_db), current_user: User = Depends(require_permission(Permission.VIEW_OWN_EXCHANGES)), ) -> list[ExchangeResponse]: """Get the current user's exchanges, sorted by date (newest first).""" result = await db.execute( select(Exchange) .where(Exchange.user_id == current_user.id) .order_by(Exchange.slot_start.desc()) ) exchanges = result.scalars().all() return [_to_exchange_response(ex, current_user.email) for ex in exchanges] @trades_router.post("/{exchange_id}/cancel", response_model=ExchangeResponse) async def cancel_my_trade( exchange_id: int, db: AsyncSession = Depends(get_db), current_user: User = Depends(require_permission(Permission.CANCEL_OWN_EXCHANGE)), ) -> ExchangeResponse: """Cancel one of the current user's exchanges.""" # Get the exchange with eager loading of user relationship result = await db.execute( select(Exchange) .options(joinedload(Exchange.user)) .where(Exchange.id == exchange_id) ) exchange = result.scalar_one_or_none() if not exchange: raise HTTPException( status_code=404, detail=f"Trade {exchange_id} not found", ) # Verify ownership if exchange.user_id != current_user.id: raise HTTPException( status_code=403, detail="Cannot cancel another user's trade", ) # Check if already in a final state if exchange.status != ExchangeStatus.BOOKED: raise HTTPException( status_code=400, detail=f"Cannot cancel: status is '{exchange.status.value}'", ) # Cancel the exchange (no time restriction per spec) exchange.status = ExchangeStatus.CANCELLED_BY_USER exchange.cancelled_at = datetime.now(UTC) await db.commit() await db.refresh(exchange) return _to_exchange_response(exchange, current_user.email) # ============================================================================= # Admin Exchanges Endpoints # ============================================================================= admin_trades_router = APIRouter(prefix="/api/admin/trades", tags=["admin-trades"]) def _to_admin_exchange_response(exchange: Exchange) -> AdminExchangeResponse: """Convert an Exchange model to AdminExchangeResponse with user contact.""" user = exchange.user return AdminExchangeResponse( id=exchange.id, user_id=exchange.user_id, user_email=user.email, user_contact=ExchangeUserContact( email=user.email, contact_email=user.contact_email, telegram=user.telegram, signal=user.signal, nostr_npub=user.nostr_npub, ), slot_start=exchange.slot_start, slot_end=exchange.slot_end, direction=exchange.direction.value, eur_amount=exchange.eur_amount, sats_amount=exchange.sats_amount, market_price_eur=exchange.market_price_eur, agreed_price_eur=exchange.agreed_price_eur, premium_percentage=exchange.premium_percentage, status=exchange.status.value, created_at=exchange.created_at, cancelled_at=exchange.cancelled_at, completed_at=exchange.completed_at, ) @admin_trades_router.get("/upcoming", response_model=list[AdminExchangeResponse]) async def get_upcoming_trades( db: AsyncSession = Depends(get_db), _current_user: User = Depends(require_permission(Permission.VIEW_ALL_EXCHANGES)), ) -> list[AdminExchangeResponse]: """Get all upcoming booked trades, sorted by slot time ascending.""" now = datetime.now(UTC) result = await db.execute( select(Exchange) .options(joinedload(Exchange.user)) .where( and_( Exchange.slot_start > now, Exchange.status == ExchangeStatus.BOOKED, ) ) .order_by(Exchange.slot_start.asc()) ) exchanges = result.scalars().all() return [_to_admin_exchange_response(ex) for ex in exchanges] @admin_trades_router.get("/past", response_model=list[AdminExchangeResponse]) async def get_past_trades( status: str | None = None, start_date: date | None = None, end_date: date | None = None, user_search: str | None = None, db: AsyncSession = Depends(get_db), _current_user: User = Depends(require_permission(Permission.VIEW_ALL_EXCHANGES)), ) -> list[AdminExchangeResponse]: """ Get past trades with optional filters. Filters: - status: Filter by exchange status - start_date, end_date: Filter by slot_start date range - user_search: Search by user email (partial match) """ now = datetime.now(UTC) # Start with base query for past trades (slot_start <= now OR not booked) query = ( select(Exchange) .options(joinedload(Exchange.user)) .where( (Exchange.slot_start <= now) | (Exchange.status != ExchangeStatus.BOOKED) ) ) # Apply status filter if status: try: status_enum = ExchangeStatus(status) query = query.where(Exchange.status == status_enum) except ValueError: raise HTTPException( status_code=400, detail=f"Invalid status: {status}", ) from None # Apply date range filter if start_date: start_dt = datetime.combine(start_date, time.min, tzinfo=UTC) query = query.where(Exchange.slot_start >= start_dt) if end_date: end_dt = datetime.combine(end_date, time.max, tzinfo=UTC) query = query.where(Exchange.slot_start <= end_dt) # Apply user search filter (join with User table) if user_search: query = query.join(Exchange.user).where(User.email.ilike(f"%{user_search}%")) # Order by most recent first query = query.order_by(Exchange.slot_start.desc()) result = await db.execute(query) exchanges = result.scalars().all() return [_to_admin_exchange_response(ex) for ex in exchanges] @admin_trades_router.post( "/{exchange_id}/complete", response_model=AdminExchangeResponse ) async def complete_trade( exchange_id: int, db: AsyncSession = Depends(get_db), _current_user: User = Depends(require_permission(Permission.CANCEL_ANY_EXCHANGE)), ) -> AdminExchangeResponse: """Mark a trade as completed. Only possible after slot time has passed.""" result = await db.execute( select(Exchange) .options(joinedload(Exchange.user)) .where(Exchange.id == exchange_id) ) exchange = result.scalar_one_or_none() if not exchange: raise HTTPException( status_code=404, detail=f"Trade {exchange_id} not found", ) # Check slot has passed if exchange.slot_start > datetime.now(UTC): raise HTTPException( status_code=400, detail="Cannot complete: trade slot has not yet started", ) # Check status is BOOKED if exchange.status != ExchangeStatus.BOOKED: raise HTTPException( status_code=400, detail=f"Cannot complete: status is '{exchange.status.value}'", ) exchange.status = ExchangeStatus.COMPLETED exchange.completed_at = datetime.now(UTC) await db.commit() await db.refresh(exchange) return _to_admin_exchange_response(exchange) @admin_trades_router.post( "/{exchange_id}/no-show", response_model=AdminExchangeResponse ) async def mark_no_show( exchange_id: int, db: AsyncSession = Depends(get_db), _current_user: User = Depends(require_permission(Permission.CANCEL_ANY_EXCHANGE)), ) -> AdminExchangeResponse: """Mark a trade as no-show. Only possible after slot time has passed.""" result = await db.execute( select(Exchange) .options(joinedload(Exchange.user)) .where(Exchange.id == exchange_id) ) exchange = result.scalar_one_or_none() if not exchange: raise HTTPException( status_code=404, detail=f"Trade {exchange_id} not found", ) # Check slot has passed if exchange.slot_start > datetime.now(UTC): raise HTTPException( status_code=400, detail="Cannot mark as no-show: trade slot has not yet started", ) # Check status is BOOKED if exchange.status != ExchangeStatus.BOOKED: raise HTTPException( status_code=400, detail=f"Cannot mark as no-show: status is '{exchange.status.value}'", ) exchange.status = ExchangeStatus.NO_SHOW exchange.completed_at = datetime.now(UTC) await db.commit() await db.refresh(exchange) return _to_admin_exchange_response(exchange) @admin_trades_router.post("/{exchange_id}/cancel", response_model=AdminExchangeResponse) async def admin_cancel_trade( exchange_id: int, db: AsyncSession = Depends(get_db), _current_user: User = Depends(require_permission(Permission.CANCEL_ANY_EXCHANGE)), ) -> AdminExchangeResponse: """Cancel any trade (admin only).""" result = await db.execute( select(Exchange) .options(joinedload(Exchange.user)) .where(Exchange.id == exchange_id) ) exchange = result.scalar_one_or_none() if not exchange: raise HTTPException( status_code=404, detail=f"Trade {exchange_id} not found", ) # Check status is BOOKED if exchange.status != ExchangeStatus.BOOKED: raise HTTPException( status_code=400, detail=f"Cannot cancel: status is '{exchange.status.value}'", ) exchange.status = ExchangeStatus.CANCELLED_BY_ADMIN exchange.cancelled_at = datetime.now(UTC) await db.commit() await db.refresh(exchange) return _to_admin_exchange_response(exchange) # All routers from this module for easy registration routers = [router, trades_router, admin_trades_router]