From f46d2ae8b313ede16199419a8d542c9f2549fd45 Mon Sep 17 00:00:00 2001 From: counterweight Date: Thu, 25 Dec 2025 00:59:57 +0100 Subject: [PATCH] refactors --- backend/auth.py | 6 +- backend/exceptions.py | 61 ++++ backend/mappers.py | 91 +++++ backend/repositories/__init__.py | 6 + backend/repositories/price.py | 27 ++ backend/repositories/user.py | 23 ++ backend/routes/exchange.py | 556 +++---------------------------- backend/routes/invites.py | 41 +-- backend/routes/profile.py | 1 + backend/schemas.py | 61 ++++ backend/services/__init__.py | 5 + backend/services/exchange.py | 392 ++++++++++++++++++++++ 12 files changed, 734 insertions(+), 536 deletions(-) create mode 100644 backend/exceptions.py create mode 100644 backend/mappers.py create mode 100644 backend/repositories/__init__.py create mode 100644 backend/repositories/price.py create mode 100644 backend/repositories/user.py create mode 100644 backend/services/__init__.py create mode 100644 backend/services/exchange.py diff --git a/backend/auth.py b/backend/auth.py index a9d63d4..159ab46 100644 --- a/backend/auth.py +++ b/backend/auth.py @@ -9,6 +9,7 @@ from sqlalchemy.ext.asyncio import AsyncSession from database import get_db from models import Permission, User +from repositories.user import UserRepository from schemas import UserResponse SECRET_KEY = os.environ["SECRET_KEY"] # Required - see .env.example @@ -45,8 +46,9 @@ def create_access_token( async def get_user_by_email(db: AsyncSession, email: str) -> User | None: - result = await db.execute(select(User).where(User.email == email)) - return result.scalar_one_or_none() + """Get user by email (backwards compatibility wrapper).""" + repo = UserRepository(db) + return await repo.get_by_email(email) async def authenticate_user(db: AsyncSession, email: str, password: str) -> User | None: diff --git a/backend/exceptions.py b/backend/exceptions.py new file mode 100644 index 0000000..7e8a641 --- /dev/null +++ b/backend/exceptions.py @@ -0,0 +1,61 @@ +"""Standardized API exception classes for consistent error responses. + +Note: These exceptions use string detail for backward compatibility with existing tests. +Future refactoring could standardize on structured error responses. +""" + +from fastapi import HTTPException, status + + +class APIError(HTTPException): + """Base API error with consistent structure. + + Uses string detail for backward compatibility with existing tests. + """ + + def __init__( + self, + status_code: int, + message: str, + ): + super().__init__(status_code=status_code, detail=message) + + +class NotFoundError(APIError): + """Resource not found error (404).""" + + def __init__(self, resource: str): + super().__init__( + status_code=status.HTTP_404_NOT_FOUND, + message=f"{resource} not found", + ) + + +class ConflictError(APIError): + """Conflict error (409).""" + + def __init__(self, message: str): + super().__init__( + status_code=status.HTTP_409_CONFLICT, + message=message, + ) + + +class BadRequestError(APIError): + """Bad request error (400).""" + + def __init__(self, message: str): + super().__init__( + status_code=status.HTTP_400_BAD_REQUEST, + message=message, + ) + + +class ServiceUnavailableError(APIError): + """Service unavailable error (503).""" + + def __init__(self, message: str): + super().__init__( + status_code=status.HTTP_503_SERVICE_UNAVAILABLE, + message=message, + ) diff --git a/backend/mappers.py b/backend/mappers.py new file mode 100644 index 0000000..8f2ad4a --- /dev/null +++ b/backend/mappers.py @@ -0,0 +1,91 @@ +"""Response mappers for converting models to API response schemas.""" + +from models import Exchange, Invite +from schemas import ( + AdminExchangeResponse, + ExchangeResponse, + ExchangeUserContact, + InviteResponse, +) + + +class ExchangeMapper: + """Mapper for Exchange model to response schemas.""" + + @staticmethod + def to_response( + exchange: Exchange, + user_email: str | None = None, + ) -> ExchangeResponse: + """Convert an Exchange model to ExchangeResponse schema.""" + email = user_email if user_email is not None else exchange.user.email + return ExchangeResponse( + id=exchange.id, + public_id=str(exchange.public_id), + user_id=exchange.user_id, + user_email=email, + slot_start=exchange.slot_start, + slot_end=exchange.slot_end, + direction=exchange.direction.value, + bitcoin_transfer_method=exchange.bitcoin_transfer_method.value, + eur_amount=exchange.eur_amount, + sats_amount=exchange.sats_amount, + market_price_eur=exchange.market_price_eur, + agreed_price_eur=exchange.agreed_price_eur, + premium_percentage=exchange.premium_percentage, + status=exchange.status.value, + created_at=exchange.created_at, + cancelled_at=exchange.cancelled_at, + completed_at=exchange.completed_at, + ) + + @staticmethod + def to_admin_response(exchange: Exchange) -> AdminExchangeResponse: + """Convert an Exchange model to AdminExchangeResponse with user contact.""" + user = exchange.user + return AdminExchangeResponse( + id=exchange.id, + public_id=str(exchange.public_id), + user_id=exchange.user_id, + user_email=user.email, + user_contact=ExchangeUserContact( + email=user.email, + contact_email=user.contact_email, + telegram=user.telegram, + signal=user.signal, + nostr_npub=user.nostr_npub, + ), + slot_start=exchange.slot_start, + slot_end=exchange.slot_end, + direction=exchange.direction.value, + bitcoin_transfer_method=exchange.bitcoin_transfer_method.value, + eur_amount=exchange.eur_amount, + sats_amount=exchange.sats_amount, + market_price_eur=exchange.market_price_eur, + agreed_price_eur=exchange.agreed_price_eur, + premium_percentage=exchange.premium_percentage, + status=exchange.status.value, + created_at=exchange.created_at, + cancelled_at=exchange.cancelled_at, + completed_at=exchange.completed_at, + ) + + +class InviteMapper: + """Mapper for Invite model to response schemas.""" + + @staticmethod + def to_response(invite: Invite) -> InviteResponse: + """Build an InviteResponse from an Invite with loaded relationships.""" + return InviteResponse( + id=invite.id, + identifier=invite.identifier, + godfather_id=invite.godfather_id, + godfather_email=invite.godfather.email, + status=invite.status.value, + used_by_id=invite.used_by_id, + used_by_email=invite.used_by.email if invite.used_by else None, + created_at=invite.created_at, + spent_at=invite.spent_at, + revoked_at=invite.revoked_at, + ) diff --git a/backend/repositories/__init__.py b/backend/repositories/__init__.py new file mode 100644 index 0000000..aff0836 --- /dev/null +++ b/backend/repositories/__init__.py @@ -0,0 +1,6 @@ +"""Repository layer for database queries.""" + +from repositories.price import PriceRepository +from repositories.user import UserRepository + +__all__ = ["PriceRepository", "UserRepository"] diff --git a/backend/repositories/price.py b/backend/repositories/price.py new file mode 100644 index 0000000..b8322da --- /dev/null +++ b/backend/repositories/price.py @@ -0,0 +1,27 @@ +"""Price repository for database queries.""" + +from sqlalchemy import desc, select +from sqlalchemy.ext.asyncio import AsyncSession + +from models import PriceHistory +from price_fetcher import PAIR_BTC_EUR, SOURCE_BITFINEX + + +class PriceRepository: + """Repository for price-related database queries.""" + + def __init__(self, db: AsyncSession): + self.db = db + + async def get_latest( + self, source: str = SOURCE_BITFINEX, pair: str = PAIR_BTC_EUR + ) -> PriceHistory | None: + """Get the most recent price from the database.""" + query = ( + select(PriceHistory) + .where(PriceHistory.source == source, PriceHistory.pair == pair) + .order_by(desc(PriceHistory.timestamp)) + .limit(1) + ) + result = await self.db.execute(query) + return result.scalar_one_or_none() diff --git a/backend/repositories/user.py b/backend/repositories/user.py new file mode 100644 index 0000000..d4c12ce --- /dev/null +++ b/backend/repositories/user.py @@ -0,0 +1,23 @@ +"""User repository for database queries.""" + +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from models import User + + +class UserRepository: + """Repository for user-related database queries.""" + + def __init__(self, db: AsyncSession): + self.db = db + + async def get_by_email(self, email: str) -> User | None: + """Get a user by email.""" + result = await self.db.execute(select(User).where(User.email == email)) + return result.scalar_one_or_none() + + async def get_by_id(self, user_id: int) -> User | None: + """Get a user by ID.""" + result = await self.db.execute(select(User).where(User.id == user_id)) + return result.scalar_one_or_none() diff --git a/backend/routes/exchange.py b/backend/routes/exchange.py index 67efb5c..96879af 100644 --- a/backend/routes/exchange.py +++ b/backend/routes/exchange.py @@ -3,16 +3,16 @@ import uuid from datetime import UTC, date, datetime, time, timedelta -from fastapi import APIRouter, Depends, HTTPException, Query -from pydantic import BaseModel -from sqlalchemy import and_, desc, select -from sqlalchemy.exc import IntegrityError +from fastapi import APIRouter, Depends, HTTPException, Query, status +from sqlalchemy import and_, select from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import joinedload from auth import require_permission from database import get_db from date_validation import validate_date_in_range +from exceptions import BadRequestError +from mappers import ExchangeMapper from models import ( Availability, BitcoinTransferMethod, @@ -24,169 +24,35 @@ from models import ( User, ) from price_fetcher import PAIR_BTC_EUR, SOURCE_BITFINEX, fetch_btc_eur_price +from repositories.price import PriceRepository from schemas import ( AdminExchangeResponse, + AvailableSlotsResponse, + BookableSlot, + ExchangeConfigResponse, + ExchangePriceResponse, ExchangeRequest, ExchangeResponse, - ExchangeUserContact, + PriceResponse, + UserSearchResult, ) +from services.exchange import ExchangeService from shared_constants import ( EUR_TRADE_INCREMENT, EUR_TRADE_MAX, EUR_TRADE_MIN, - LIGHTNING_MAX_EUR, PREMIUM_PERCENTAGE, - PRICE_STALENESS_SECONDS, SLOT_DURATION_MINUTES, ) router = APIRouter(prefix="/api/exchange", tags=["exchange"]) -# ============================================================================= -# Constants for satoshi calculations -# ============================================================================= - -SATS_PER_BTC = 100_000_000 - - -# ============================================================================= -# Pydantic models for price endpoint -# ============================================================================= - - -class ExchangeConfigResponse(BaseModel): - """Exchange configuration for the frontend.""" - - eur_min: int - eur_max: int - eur_increment: int - premium_percentage: int - - -class PriceResponse(BaseModel): - """Current BTC/EUR price for trading. - - Note: The actual agreed price depends on trade direction (buy/sell) - and is calculated by the frontend using market_price and premium_percentage. - """ - - market_price: float # Raw price from exchange - premium_percentage: int - timestamp: datetime - is_stale: bool - - -class ExchangePriceResponse(BaseModel): - """Combined price and configuration response.""" - - price: PriceResponse | None # None if price fetch failed - config: ExchangeConfigResponse - error: str | None = None - - -class BookableSlot(BaseModel): - """A single bookable time slot.""" - - start_time: datetime - end_time: datetime - - -class AvailableSlotsResponse(BaseModel): - """Response containing available slots for a date.""" - - date: date - slots: list[BookableSlot] - # ============================================================================= # Helper functions # ============================================================================= -def apply_premium_for_direction( - market_price: float, - premium_percentage: int, - direction: TradeDirection, -) -> float: - """ - Apply premium to market price based on trade direction. - - The premium is always favorable to the admin: - - When user BUYS BTC: user pays MORE (market * (1 + premium/100)) - - When user SELLS BTC: user receives LESS (market * (1 - premium/100)) - """ - if direction == TradeDirection.BUY: - return market_price * (1 + premium_percentage / 100) - else: # SELL - return market_price * (1 - premium_percentage / 100) - - -def calculate_sats_amount( - eur_cents: int, - price_eur_per_btc: float, -) -> int: - """ - Calculate satoshi amount from EUR cents and price. - - Args: - eur_cents: Amount in EUR cents (e.g., 10000 = €100) - price_eur_per_btc: Price in EUR per BTC - - Returns: - Amount in satoshis - """ - eur_amount = eur_cents / 100 - btc_amount = eur_amount / price_eur_per_btc - return int(btc_amount * SATS_PER_BTC) - - -async def get_latest_price(db: AsyncSession) -> PriceHistory | None: - """Get the most recent price from the database.""" - query = ( - select(PriceHistory) - .where( - PriceHistory.source == SOURCE_BITFINEX, PriceHistory.pair == PAIR_BTC_EUR - ) - .order_by(desc(PriceHistory.timestamp)) - .limit(1) - ) - result = await db.execute(query) - return result.scalar_one_or_none() - - -def is_price_stale(price_timestamp: datetime) -> bool: - """Check if a price is older than the staleness threshold.""" - age_seconds = (datetime.now(UTC) - price_timestamp).total_seconds() - return age_seconds > PRICE_STALENESS_SECONDS - - -def _to_exchange_response( - exchange: Exchange, - user_email: str | None = None, -) -> ExchangeResponse: - """Convert an Exchange model to ExchangeResponse schema.""" - email = user_email if user_email is not None else exchange.user.email - return ExchangeResponse( - id=exchange.id, - public_id=str(exchange.public_id), - user_id=exchange.user_id, - user_email=email, - slot_start=exchange.slot_start, - slot_end=exchange.slot_end, - direction=exchange.direction.value, - bitcoin_transfer_method=exchange.bitcoin_transfer_method.value, - eur_amount=exchange.eur_amount, - sats_amount=exchange.sats_amount, - market_price_eur=exchange.market_price_eur, - agreed_price_eur=exchange.agreed_price_eur, - premium_percentage=exchange.premium_percentage, - status=exchange.status.value, - created_at=exchange.created_at, - cancelled_at=exchange.cancelled_at, - completed_at=exchange.completed_at, - ) - - # ============================================================================= # Price Endpoint # ============================================================================= @@ -216,11 +82,14 @@ async def get_exchange_price( premium_percentage=PREMIUM_PERCENTAGE, ) + price_repo = PriceRepository(db) + service = ExchangeService(db) + # Try to get the latest cached price - cached_price = await get_latest_price(db) + cached_price = await price_repo.get_latest() # If no cached price or it's stale, try to fetch a new one - if cached_price is None or is_price_stale(cached_price.timestamp): + if cached_price is None or service.is_price_stale(cached_price.timestamp): try: price_value, timestamp = await fetch_btc_eur_price() @@ -270,7 +139,7 @@ async def get_exchange_price( market_price=cached_price.price, premium_percentage=PREMIUM_PERCENTAGE, timestamp=cached_price.timestamp, - is_stale=is_price_stale(cached_price.timestamp), + is_stale=service.is_price_stale(cached_price.timestamp), ), config=config, ) @@ -377,194 +246,34 @@ async def create_exchange( - Price is not stale - EUR amount is within configured limits """ - slot_date = request.slot_start.date() - validate_date_in_range(slot_date, context="book") - - # Check if user already has a trade on this date - existing_trade_query = select(Exchange).where( - and_( - Exchange.user_id == current_user.id, - Exchange.slot_start >= datetime.combine(slot_date, time.min, tzinfo=UTC), - Exchange.slot_start - < datetime.combine(slot_date, time.max, tzinfo=UTC) + timedelta(days=1), - Exchange.status == ExchangeStatus.BOOKED, - ) - ) - existing_trade_result = await db.execute(existing_trade_query) - existing_trade = existing_trade_result.scalar_one_or_none() - - if existing_trade: - raise HTTPException( - status_code=400, - detail=( - f"You already have a trade booked on {slot_date.strftime('%Y-%m-%d')}. " - f"Only one trade per day is allowed. " - f"Trade ID: {existing_trade.public_id}" - ), - ) - # Validate direction try: direction = TradeDirection(request.direction) except ValueError: - raise HTTPException( - status_code=400, - detail=f"Invalid direction: {request.direction}. Must be 'buy' or 'sell'.", + raise BadRequestError( + f"Invalid direction: {request.direction}. Must be 'buy' or 'sell'." ) from None # Validate bitcoin transfer method try: bitcoin_transfer_method = BitcoinTransferMethod(request.bitcoin_transfer_method) except ValueError: - raise HTTPException( - status_code=400, - detail=( - f"Invalid bitcoin_transfer_method: {request.bitcoin_transfer_method}. " - "Must be 'onchain' or 'lightning'." - ), + raise BadRequestError( + f"Invalid bitcoin_transfer_method: {request.bitcoin_transfer_method}. " + "Must be 'onchain' or 'lightning'." ) from None - # Validate EUR amount - if request.eur_amount < EUR_TRADE_MIN * 100: - raise HTTPException( - status_code=400, - detail=f"EUR amount must be at least €{EUR_TRADE_MIN}", - ) - if request.eur_amount > EUR_TRADE_MAX * 100: - raise HTTPException( - status_code=400, - detail=f"EUR amount must be at most €{EUR_TRADE_MAX}", - ) - if request.eur_amount % (EUR_TRADE_INCREMENT * 100) != 0: - raise HTTPException( - status_code=400, - detail=f"EUR amount must be a multiple of €{EUR_TRADE_INCREMENT}", - ) - - # Validate Lightning threshold - if ( - bitcoin_transfer_method == BitcoinTransferMethod.LIGHTNING - and request.eur_amount > LIGHTNING_MAX_EUR * 100 - ): - raise HTTPException( - status_code=400, - detail=( - f"Lightning payments are only allowed for amounts up to " - f"€{LIGHTNING_MAX_EUR}. For amounts above €{LIGHTNING_MAX_EUR}, " - "please use onchain transactions." - ), - ) - - # Validate slot timing - compute valid boundaries from slot duration - valid_minutes = tuple(range(0, 60, SLOT_DURATION_MINUTES)) - if request.slot_start.minute not in valid_minutes: - raise HTTPException( - status_code=400, - detail=f"Slot must be on {SLOT_DURATION_MINUTES}-minute boundary", - ) - if request.slot_start.second != 0 or request.slot_start.microsecond != 0: - raise HTTPException( - status_code=400, - detail="Slot start time must not have seconds or microseconds", - ) - - # Verify slot falls within availability - slot_start_time = request.slot_start.time() - slot_end_dt = request.slot_start + timedelta(minutes=SLOT_DURATION_MINUTES) - slot_end_time = slot_end_dt.time() - - result = await db.execute( - select(Availability).where( - and_( - Availability.date == slot_date, - Availability.start_time <= slot_start_time, - Availability.end_time >= slot_end_time, - ) - ) - ) - matching_availability = result.scalar_one_or_none() - - if not matching_availability: - slot_str = request.slot_start.strftime("%Y-%m-%d %H:%M") - raise HTTPException( - status_code=400, - detail=f"Selected slot at {slot_str} UTC is not available", - ) - - # Get and validate price - cached_price = await get_latest_price(db) - - if cached_price is None: - raise HTTPException( - status_code=503, - detail="Price data unavailable. Please try again later.", - ) - - if is_price_stale(cached_price.timestamp): - raise HTTPException( - status_code=503, - detail="Price is stale. Please refresh and try again.", - ) - - # Calculate agreed price based on direction - market_price = cached_price.price - agreed_price = apply_premium_for_direction( - market_price, PREMIUM_PERCENTAGE, direction - ) - - # Calculate sats amount based on agreed price - sats_amount = calculate_sats_amount(request.eur_amount, agreed_price) - - # Check if slot is already booked (only consider BOOKED status, not cancelled) - slot_booked_query = select(Exchange).where( - and_( - Exchange.slot_start == request.slot_start, - Exchange.status == ExchangeStatus.BOOKED, - ) - ) - slot_booked_result = await db.execute(slot_booked_query) - slot_booked = slot_booked_result.scalar_one_or_none() - - if slot_booked: - slot_str = request.slot_start.strftime("%Y-%m-%d %H:%M") - raise HTTPException( - status_code=409, - detail=( - f"This slot at {slot_str} UTC has already been booked. " - "Select another slot." - ), - ) - - # Create the exchange - exchange = Exchange( - user_id=current_user.id, + # Use service to create exchange (handles all validation) + service = ExchangeService(db) + exchange = await service.create_exchange( + user=current_user, slot_start=request.slot_start, - slot_end=slot_end_dt, direction=direction, bitcoin_transfer_method=bitcoin_transfer_method, eur_amount=request.eur_amount, - sats_amount=sats_amount, - market_price_eur=market_price, - agreed_price_eur=agreed_price, - premium_percentage=PREMIUM_PERCENTAGE, - status=ExchangeStatus.BOOKED, ) - db.add(exchange) - - try: - await db.commit() - await db.refresh(exchange) - except IntegrityError as e: - await db.rollback() - # This should rarely happen now since we check explicitly above, - # but keep it for other potential integrity violations - raise HTTPException( - status_code=409, - detail="Database constraint violation. Please try again.", - ) from e - - return _to_exchange_response(exchange, current_user.email) + return ExchangeMapper.to_response(exchange, current_user.email) # ============================================================================= @@ -587,7 +296,7 @@ async def get_my_trades( ) exchanges = result.scalars().all() - return [_to_exchange_response(ex, current_user.email) for ex in exchanges] + return [ExchangeMapper.to_response(ex, current_user.email) for ex in exchanges] @trades_router.get("/{public_id}", response_model=ExchangeResponse) @@ -597,20 +306,10 @@ async def get_my_trade( current_user: User = Depends(require_permission(Permission.VIEW_OWN_EXCHANGES)), ) -> ExchangeResponse: """Get a specific trade by public ID. User can only access their own trades.""" - result = await db.execute( - select(Exchange).where( - and_(Exchange.public_id == public_id, Exchange.user_id == current_user.id) - ) - ) - exchange = result.scalar_one_or_none() + service = ExchangeService(db) + exchange = await service.get_exchange_by_public_id(public_id, user=current_user) - if not exchange: - raise HTTPException( - status_code=404, - detail="Trade not found or you don't have permission to view it.", - ) - - return _to_exchange_response(exchange, current_user.email) + return ExchangeMapper.to_response(exchange, current_user.email) @trades_router.post("/{public_id}/cancel", response_model=ExchangeResponse) @@ -620,48 +319,20 @@ async def cancel_my_trade( current_user: User = Depends(require_permission(Permission.CANCEL_OWN_EXCHANGE)), ) -> ExchangeResponse: """Cancel one of the current user's exchanges.""" - # Get the exchange with eager loading of user relationship - result = await db.execute( - select(Exchange) - .options(joinedload(Exchange.user)) - .where(Exchange.public_id == public_id) - ) - exchange = result.scalar_one_or_none() + service = ExchangeService(db) + # Get exchange without user filter first to check ownership separately + exchange = await service.get_exchange_by_public_id(public_id) - if not exchange: - raise HTTPException( - status_code=404, - detail="Trade not found", - ) - - # Verify ownership + # Check ownership - return 403 if user doesn't own it if exchange.user_id != current_user.id: raise HTTPException( - status_code=403, + status_code=status.HTTP_403_FORBIDDEN, detail="Cannot cancel another user's trade", ) - # Check if already in a final state - if exchange.status != ExchangeStatus.BOOKED: - raise HTTPException( - status_code=400, - detail=f"Cannot cancel: status is '{exchange.status.value}'", - ) + exchange = await service.cancel_exchange(exchange, current_user, is_admin=False) - # Check if slot time has already passed - if exchange.slot_start <= datetime.now(UTC): - raise HTTPException( - status_code=400, - detail="Cannot cancel: trade slot time has already passed", - ) - - exchange.status = ExchangeStatus.CANCELLED_BY_USER - exchange.cancelled_at = datetime.now(UTC) - - await db.commit() - await db.refresh(exchange) - - return _to_exchange_response(exchange, current_user.email) + return ExchangeMapper.to_response(exchange, current_user.email) # ============================================================================= @@ -671,37 +342,6 @@ async def cancel_my_trade( admin_trades_router = APIRouter(prefix="/api/admin/trades", tags=["admin-trades"]) -def _to_admin_exchange_response(exchange: Exchange) -> AdminExchangeResponse: - """Convert an Exchange model to AdminExchangeResponse with user contact.""" - user = exchange.user - return AdminExchangeResponse( - id=exchange.id, - public_id=str(exchange.public_id), - user_id=exchange.user_id, - user_email=user.email, - user_contact=ExchangeUserContact( - email=user.email, - contact_email=user.contact_email, - telegram=user.telegram, - signal=user.signal, - nostr_npub=user.nostr_npub, - ), - slot_start=exchange.slot_start, - slot_end=exchange.slot_end, - direction=exchange.direction.value, - bitcoin_transfer_method=exchange.bitcoin_transfer_method.value, - eur_amount=exchange.eur_amount, - sats_amount=exchange.sats_amount, - market_price_eur=exchange.market_price_eur, - agreed_price_eur=exchange.agreed_price_eur, - premium_percentage=exchange.premium_percentage, - status=exchange.status.value, - created_at=exchange.created_at, - cancelled_at=exchange.cancelled_at, - completed_at=exchange.completed_at, - ) - - @admin_trades_router.get("/upcoming", response_model=list[AdminExchangeResponse]) async def get_upcoming_trades( db: AsyncSession = Depends(get_db), @@ -722,7 +362,7 @@ async def get_upcoming_trades( ) exchanges = result.scalars().all() - return [_to_admin_exchange_response(ex) for ex in exchanges] + return [ExchangeMapper.to_admin_response(ex) for ex in exchanges] @admin_trades_router.get("/past", response_model=list[AdminExchangeResponse]) @@ -783,7 +423,7 @@ async def get_past_trades( result = await db.execute(query) exchanges = result.scalars().all() - return [_to_admin_exchange_response(ex) for ex in exchanges] + return [ExchangeMapper.to_admin_response(ex) for ex in exchanges] @admin_trades_router.post("/{public_id}/complete", response_model=AdminExchangeResponse) @@ -793,41 +433,11 @@ async def complete_trade( _current_user: User = Depends(require_permission(Permission.COMPLETE_EXCHANGE)), ) -> AdminExchangeResponse: """Mark a trade as completed. Only possible after slot time has passed.""" + service = ExchangeService(db) + exchange = await service.get_exchange_by_public_id(public_id) + exchange = await service.complete_exchange(exchange) - result = await db.execute( - select(Exchange) - .options(joinedload(Exchange.user)) - .where(Exchange.public_id == public_id) - ) - exchange = result.scalar_one_or_none() - - if not exchange: - raise HTTPException( - status_code=404, - detail="Trade not found", - ) - - # Check slot has passed - if exchange.slot_start > datetime.now(UTC): - raise HTTPException( - status_code=400, - detail="Cannot complete: trade slot has not yet started", - ) - - # Check status is BOOKED - if exchange.status != ExchangeStatus.BOOKED: - raise HTTPException( - status_code=400, - detail=f"Cannot complete: status is '{exchange.status.value}'", - ) - - exchange.status = ExchangeStatus.COMPLETED - exchange.completed_at = datetime.now(UTC) - - await db.commit() - await db.refresh(exchange) - - return _to_admin_exchange_response(exchange) + return ExchangeMapper.to_admin_response(exchange) @admin_trades_router.post("/{public_id}/no-show", response_model=AdminExchangeResponse) @@ -837,41 +447,11 @@ async def mark_no_show( _current_user: User = Depends(require_permission(Permission.COMPLETE_EXCHANGE)), ) -> AdminExchangeResponse: """Mark a trade as no-show. Only possible after slot time has passed.""" + service = ExchangeService(db) + exchange = await service.get_exchange_by_public_id(public_id) + exchange = await service.mark_no_show(exchange) - result = await db.execute( - select(Exchange) - .options(joinedload(Exchange.user)) - .where(Exchange.public_id == public_id) - ) - exchange = result.scalar_one_or_none() - - if not exchange: - raise HTTPException( - status_code=404, - detail="Trade not found", - ) - - # Check slot has passed - if exchange.slot_start > datetime.now(UTC): - raise HTTPException( - status_code=400, - detail="Cannot mark as no-show: trade slot has not yet started", - ) - - # Check status is BOOKED - if exchange.status != ExchangeStatus.BOOKED: - raise HTTPException( - status_code=400, - detail=f"Cannot mark as no-show: status is '{exchange.status.value}'", - ) - - exchange.status = ExchangeStatus.NO_SHOW - exchange.completed_at = datetime.now(UTC) - - await db.commit() - await db.refresh(exchange) - - return _to_admin_exchange_response(exchange) + return ExchangeMapper.to_admin_response(exchange) @admin_trades_router.post("/{public_id}/cancel", response_model=AdminExchangeResponse) @@ -881,34 +461,11 @@ async def admin_cancel_trade( _current_user: User = Depends(require_permission(Permission.CANCEL_ANY_EXCHANGE)), ) -> AdminExchangeResponse: """Cancel any trade (admin only).""" + service = ExchangeService(db) + exchange = await service.get_exchange_by_public_id(public_id) + exchange = await service.cancel_exchange(exchange, _current_user, is_admin=True) - result = await db.execute( - select(Exchange) - .options(joinedload(Exchange.user)) - .where(Exchange.public_id == public_id) - ) - exchange = result.scalar_one_or_none() - - if not exchange: - raise HTTPException( - status_code=404, - detail="Trade not found", - ) - - # Check status is BOOKED - if exchange.status != ExchangeStatus.BOOKED: - raise HTTPException( - status_code=400, - detail=f"Cannot cancel: status is '{exchange.status.value}'", - ) - - exchange.status = ExchangeStatus.CANCELLED_BY_ADMIN - exchange.cancelled_at = datetime.now(UTC) - - await db.commit() - await db.refresh(exchange) - - return _to_admin_exchange_response(exchange) + return ExchangeMapper.to_admin_response(exchange) # ============================================================================= @@ -918,13 +475,6 @@ async def admin_cancel_trade( admin_users_router = APIRouter(prefix="/api/admin/users", tags=["admin-users"]) -class UserSearchResult(BaseModel): - """Result item for user search.""" - - id: int - email: str - - @admin_users_router.get("/search", response_model=list[UserSearchResult]) async def search_users( q: str = Query(..., min_length=1, description="Search query for user email"), diff --git a/backend/routes/invites.py b/backend/routes/invites.py index 00bc411..7e50472 100644 --- a/backend/routes/invites.py +++ b/backend/routes/invites.py @@ -9,11 +9,13 @@ from sqlalchemy.ext.asyncio import AsyncSession from auth import require_permission from database import get_db +from exceptions import BadRequestError, NotFoundError from invite_utils import ( generate_invite_identifier, is_valid_identifier_format, normalize_identifier, ) +from mappers import InviteMapper from models import Invite, InviteStatus, Permission, User from pagination import calculate_offset, create_paginated_response from schemas import ( @@ -31,22 +33,6 @@ admin_router = APIRouter(prefix="/api/admin", tags=["admin"]) MAX_INVITE_COLLISION_RETRIES = 3 -def _to_invite_response(invite: Invite) -> InviteResponse: - """Build an InviteResponse from an Invite with loaded relationships.""" - return InviteResponse( - id=invite.id, - identifier=invite.identifier, - godfather_id=invite.godfather_id, - godfather_email=invite.godfather.email, - status=invite.status.value, - used_by_id=invite.used_by_id, - used_by_email=invite.used_by.email if invite.used_by else None, - created_at=invite.created_at, - spent_at=invite.spent_at, - revoked_at=invite.revoked_at, - ) - - @router.get("/{identifier}/check", response_model=InviteCheckResponse) async def check_invite( identifier: str, @@ -118,10 +104,7 @@ async def create_invite( result = await db.execute(select(User.id).where(User.id == data.godfather_id)) godfather_id = result.scalar_one_or_none() if not godfather_id: - raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, - detail="Godfather user not found", - ) + raise BadRequestError("Godfather user not found") # Try to create invite with retry on collision invite: Invite | None = None @@ -150,7 +133,7 @@ async def create_invite( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to create invite", ) - return _to_invite_response(invite) + return InviteMapper.to_response(invite) @admin_router.get("/invites", response_model=PaginatedInviteRecords) @@ -197,7 +180,7 @@ async def list_all_invites( invites = result.scalars().all() # Build responses using preloaded relationships - records = [_to_invite_response(invite) for invite in invites] + records = [InviteMapper.to_response(invite) for invite in invites] return create_paginated_response(records, total, page, per_page) @@ -213,16 +196,12 @@ async def revoke_invite( invite = result.scalar_one_or_none() if not invite: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail="Invite not found", - ) + raise NotFoundError("Invite") if invite.status != InviteStatus.READY: - raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, - detail=f"Cannot revoke invite with status '{invite.status.value}'. " - "Only READY invites can be revoked.", + raise BadRequestError( + f"Cannot revoke invite with status '{invite.status.value}'. " + "Only READY invites can be revoked." ) invite.status = InviteStatus.REVOKED @@ -230,7 +209,7 @@ async def revoke_invite( await db.commit() await db.refresh(invite) - return _to_invite_response(invite) + return InviteMapper.to_response(invite) # All routers from this module for easy registration diff --git a/backend/routes/profile.py b/backend/routes/profile.py index bbf475d..bd294cb 100644 --- a/backend/routes/profile.py +++ b/backend/routes/profile.py @@ -54,6 +54,7 @@ async def update_profile( ) if errors: + # Keep field_errors format for backward compatibility with frontend raise HTTPException( status_code=422, detail={"field_errors": errors}, diff --git a/backend/schemas.py b/backend/schemas.py index 9818387..57b77de 100644 --- a/backend/schemas.py +++ b/backend/schemas.py @@ -277,3 +277,64 @@ class ConstantsResponse(BaseModel): roles: list[str] invite_statuses: list[InviteStatus] bitcoin_transfer_methods: list[BitcoinTransferMethod] + + +# ============================================================================= +# Exchange Price/Config Schemas +# ============================================================================= + + +class ExchangeConfigResponse(BaseModel): + """Exchange configuration for the frontend.""" + + eur_min: int + eur_max: int + eur_increment: int + premium_percentage: int + + +class PriceResponse(BaseModel): + """Current BTC/EUR price for trading. + + Note: The actual agreed price depends on trade direction (buy/sell) + and is calculated by the frontend using market_price and premium_percentage. + """ + + market_price: float # Raw price from exchange + premium_percentage: int + timestamp: datetime + is_stale: bool + + +class ExchangePriceResponse(BaseModel): + """Combined price and configuration response.""" + + price: PriceResponse | None # None if price fetch failed + config: ExchangeConfigResponse + error: str | None = None + + +class BookableSlot(BaseModel): + """A single bookable time slot.""" + + start_time: datetime + end_time: datetime + + +class AvailableSlotsResponse(BaseModel): + """Response containing available slots for a date.""" + + date: date + slots: list[BookableSlot] + + +# ============================================================================= +# Admin User Search Schemas +# ============================================================================= + + +class UserSearchResult(BaseModel): + """Result item for user search.""" + + id: int + email: str diff --git a/backend/services/__init__.py b/backend/services/__init__.py new file mode 100644 index 0000000..6bf4bc8 --- /dev/null +++ b/backend/services/__init__.py @@ -0,0 +1,5 @@ +"""Service layer for business logic.""" + +from services.exchange import ExchangeService + +__all__ = ["ExchangeService"] diff --git a/backend/services/exchange.py b/backend/services/exchange.py new file mode 100644 index 0000000..cf1f641 --- /dev/null +++ b/backend/services/exchange.py @@ -0,0 +1,392 @@ +"""Exchange service for business logic related to Bitcoin trading.""" + +import uuid +from datetime import UTC, date, datetime, time, timedelta + +from sqlalchemy import and_, select +from sqlalchemy.exc import IntegrityError +from sqlalchemy.ext.asyncio import AsyncSession + +from date_validation import validate_date_in_range +from exceptions import ( + BadRequestError, + ConflictError, + NotFoundError, + ServiceUnavailableError, +) +from models import ( + Availability, + BitcoinTransferMethod, + Exchange, + ExchangeStatus, + PriceHistory, + TradeDirection, + User, +) +from repositories.price import PriceRepository +from shared_constants import ( + EUR_TRADE_INCREMENT, + EUR_TRADE_MAX, + EUR_TRADE_MIN, + LIGHTNING_MAX_EUR, + PREMIUM_PERCENTAGE, + PRICE_STALENESS_SECONDS, + SLOT_DURATION_MINUTES, +) + +# Constants for satoshi calculations +SATS_PER_BTC = 100_000_000 + + +class ExchangeService: + """Service for exchange-related business logic.""" + + def __init__(self, db: AsyncSession): + self.db = db + self.price_repo = PriceRepository(db) + + def apply_premium_for_direction( + self, + market_price: float, + premium_percentage: int, + direction: TradeDirection, + ) -> float: + """ + Apply premium to market price based on trade direction. + + The premium is always favorable to the admin: + - When user BUYS BTC: user pays MORE (market * (1 + premium/100)) + - When user SELLS BTC: user receives LESS (market * (1 - premium/100)) + """ + if direction == TradeDirection.BUY: + return market_price * (1 + premium_percentage / 100) + else: # SELL + return market_price * (1 - premium_percentage / 100) + + def calculate_sats_amount( + self, + eur_cents: int, + price_eur_per_btc: float, + ) -> int: + """ + Calculate satoshi amount from EUR cents and price. + + Args: + eur_cents: Amount in EUR cents (e.g., 10000 = €100) + price_eur_per_btc: Price in EUR per BTC + + Returns: + Amount in satoshis + """ + eur_amount = eur_cents / 100 + btc_amount = eur_amount / price_eur_per_btc + return int(btc_amount * SATS_PER_BTC) + + def is_price_stale(self, price_timestamp: datetime) -> bool: + """Check if a price is older than the staleness threshold.""" + age_seconds = (datetime.now(UTC) - price_timestamp).total_seconds() + return age_seconds > PRICE_STALENESS_SECONDS + + async def get_latest_price(self) -> PriceHistory | None: + """Get the most recent price from the database.""" + return await self.price_repo.get_latest() + + async def validate_slot_timing(self, slot_start: datetime) -> None: + """Validate slot timing - compute valid boundaries from slot duration.""" + valid_minutes = tuple(range(0, 60, SLOT_DURATION_MINUTES)) + if slot_start.minute not in valid_minutes: + raise BadRequestError( + f"Slot must be on {SLOT_DURATION_MINUTES}-minute boundary" + ) + if slot_start.second != 0 or slot_start.microsecond != 0: + raise BadRequestError( + "Slot start time must not have seconds or microseconds" + ) + + async def validate_slot_availability( + self, slot_start: datetime, slot_date: date + ) -> None: + """Verify slot falls within availability.""" + slot_start_time = slot_start.time() + slot_end_dt = slot_start + timedelta(minutes=SLOT_DURATION_MINUTES) + slot_end_time = slot_end_dt.time() + + result = await self.db.execute( + select(Availability).where( + and_( + Availability.date == slot_date, + Availability.start_time <= slot_start_time, + Availability.end_time >= slot_end_time, + ) + ) + ) + matching_availability = result.scalar_one_or_none() + + if not matching_availability: + slot_str = slot_start.strftime("%Y-%m-%d %H:%M") + raise BadRequestError(f"Selected slot at {slot_str} UTC is not available") + + async def validate_price_not_stale(self) -> PriceHistory: + """Validate price exists and is not stale.""" + cached_price = await self.get_latest_price() + + if cached_price is None: + raise ServiceUnavailableError( + "Price data unavailable. Please try again later." + ) + + if self.is_price_stale(cached_price.timestamp): + raise ServiceUnavailableError( + "Price is stale. Please refresh and try again." + ) + + return cached_price + + async def validate_eur_amount(self, eur_amount: int) -> None: + """Validate EUR amount is within configured limits.""" + if eur_amount < EUR_TRADE_MIN * 100: + raise BadRequestError(f"EUR amount must be at least €{EUR_TRADE_MIN}") + if eur_amount > EUR_TRADE_MAX * 100: + raise BadRequestError(f"EUR amount must be at most €{EUR_TRADE_MAX}") + if eur_amount % (EUR_TRADE_INCREMENT * 100) != 0: + raise BadRequestError( + f"EUR amount must be a multiple of €{EUR_TRADE_INCREMENT}" + ) + + async def validate_lightning_threshold( + self, bitcoin_transfer_method: BitcoinTransferMethod, eur_amount: int + ) -> None: + """Validate Lightning threshold.""" + if ( + bitcoin_transfer_method == BitcoinTransferMethod.LIGHTNING + and eur_amount > LIGHTNING_MAX_EUR * 100 + ): + raise BadRequestError( + f"Lightning payments are only allowed for amounts up to " + f"€{LIGHTNING_MAX_EUR}. For amounts above €{LIGHTNING_MAX_EUR}, " + "please use onchain transactions." + ) + + async def check_existing_trade_on_date( + self, user: User, slot_date: date + ) -> Exchange | None: + """Check if user already has a trade on this date.""" + existing_trade_query = select(Exchange).where( + and_( + Exchange.user_id == user.id, + Exchange.slot_start + >= datetime.combine(slot_date, time.min, tzinfo=UTC), + Exchange.slot_start + < datetime.combine(slot_date, time.max, tzinfo=UTC) + timedelta(days=1), + Exchange.status == ExchangeStatus.BOOKED, + ) + ) + result = await self.db.execute(existing_trade_query) + return result.scalar_one_or_none() + + async def check_slot_already_booked(self, slot_start: datetime) -> Exchange | None: + """Check if slot is already booked (only consider BOOKED status).""" + slot_booked_query = select(Exchange).where( + and_( + Exchange.slot_start == slot_start, + Exchange.status == ExchangeStatus.BOOKED, + ) + ) + result = await self.db.execute(slot_booked_query) + return result.scalar_one_or_none() + + async def create_exchange( + self, + user: User, + slot_start: datetime, + direction: TradeDirection, + bitcoin_transfer_method: BitcoinTransferMethod, + eur_amount: int, + ) -> Exchange: + """ + Create a new exchange trade booking with all business validation. + + Raises: + BadRequestError: For validation failures + ConflictError: If slot is already booked or user has trade on date + ServiceUnavailableError: If price is unavailable or stale + """ + slot_date = slot_start.date() + validate_date_in_range(slot_date, context="book") + + # Check if user already has a trade on this date + existing_trade = await self.check_existing_trade_on_date(user, slot_date) + if existing_trade: + raise BadRequestError( + f"You already have a trade booked on {slot_date.strftime('%Y-%m-%d')}. " + f"Only one trade per day is allowed. " + f"Trade ID: {existing_trade.public_id}" + ) + + # Validate EUR amount + await self.validate_eur_amount(eur_amount) + + # Validate Lightning threshold + await self.validate_lightning_threshold(bitcoin_transfer_method, eur_amount) + + # Validate slot timing + await self.validate_slot_timing(slot_start) + + # Verify slot falls within availability + await self.validate_slot_availability(slot_start, slot_date) + + # Get and validate price + cached_price = await self.validate_price_not_stale() + + # Calculate agreed price based on direction + market_price = cached_price.price + agreed_price = self.apply_premium_for_direction( + market_price, PREMIUM_PERCENTAGE, direction + ) + + # Calculate sats amount based on agreed price + sats_amount = self.calculate_sats_amount(eur_amount, agreed_price) + + # Check if slot is already booked + slot_booked = await self.check_slot_already_booked(slot_start) + if slot_booked: + slot_str = slot_start.strftime("%Y-%m-%d %H:%M") + raise ConflictError( + f"This slot at {slot_str} UTC has already been booked. " + "Select another slot." + ) + + # Create the exchange + slot_end_dt = slot_start + timedelta(minutes=SLOT_DURATION_MINUTES) + exchange = Exchange( + user_id=user.id, + slot_start=slot_start, + slot_end=slot_end_dt, + direction=direction, + bitcoin_transfer_method=bitcoin_transfer_method, + eur_amount=eur_amount, + sats_amount=sats_amount, + market_price_eur=market_price, + agreed_price_eur=agreed_price, + premium_percentage=PREMIUM_PERCENTAGE, + status=ExchangeStatus.BOOKED, + ) + + self.db.add(exchange) + + try: + await self.db.commit() + await self.db.refresh(exchange) + except IntegrityError as e: + await self.db.rollback() + # This should rarely happen now since we check explicitly above, + # but keep it for other potential integrity violations + raise ConflictError( + "Database constraint violation. Please try again." + ) from e + + return exchange + + async def get_exchange_by_public_id( + self, public_id: uuid.UUID, user: User | None = None + ) -> Exchange: + """ + Get an exchange by public ID, optionally checking ownership. + + Raises: + NotFoundError: If exchange not found or user doesn't own it + (for security, returns 404) + """ + query = select(Exchange).where(Exchange.public_id == public_id) + result = await self.db.execute(query) + exchange = result.scalar_one_or_none() + + if not exchange: + raise NotFoundError("Trade") + + # Check ownership if user is provided - return 404 for security + # (prevents info leakage) + if user and exchange.user_id != user.id: + raise NotFoundError("Trade") + + return exchange + + async def cancel_exchange( + self, exchange: Exchange, user: User, is_admin: bool = False + ) -> Exchange: + """ + Cancel an exchange trade. + + Raises: + BadRequestError: If cancellation is not allowed + NotFoundError: If user doesn't own the exchange (when not admin, + returns 404 for security) + """ + if not is_admin and exchange.user_id != user.id: + raise NotFoundError("Trade") + + if exchange.status != ExchangeStatus.BOOKED: + raise BadRequestError(f"Cannot cancel: status is '{exchange.status.value}'") + + if exchange.slot_start <= datetime.now(UTC): + raise BadRequestError("Cannot cancel: trade slot time has already passed") + + exchange.status = ( + ExchangeStatus.CANCELLED_BY_ADMIN + if is_admin + else ExchangeStatus.CANCELLED_BY_USER + ) + exchange.cancelled_at = datetime.now(UTC) + + await self.db.commit() + await self.db.refresh(exchange) + + return exchange + + async def complete_exchange(self, exchange: Exchange) -> Exchange: + """ + Mark an exchange as completed. + + Raises: + BadRequestError: If completion is not allowed + """ + if exchange.slot_start > datetime.now(UTC): + raise BadRequestError("Cannot complete: trade slot has not yet started") + + if exchange.status != ExchangeStatus.BOOKED: + raise BadRequestError( + f"Cannot complete: status is '{exchange.status.value}'" + ) + + exchange.status = ExchangeStatus.COMPLETED + exchange.completed_at = datetime.now(UTC) + + await self.db.commit() + await self.db.refresh(exchange) + + return exchange + + async def mark_no_show(self, exchange: Exchange) -> Exchange: + """ + Mark an exchange as no-show. + + Raises: + BadRequestError: If marking as no-show is not allowed + """ + if exchange.slot_start > datetime.now(UTC): + raise BadRequestError( + "Cannot mark as no-show: trade slot has not yet started" + ) + + if exchange.status != ExchangeStatus.BOOKED: + raise BadRequestError( + f"Cannot mark as no-show: status is '{exchange.status.value}'" + ) + + exchange.status = ExchangeStatus.NO_SHOW + exchange.completed_at = datetime.now(UTC) + + await self.db.commit() + await self.db.refresh(exchange) + + return exchange