arbret/backend/routes/exchange.py
counterweight d317939ad0
Step 5: Update exchange price endpoint to use new pricing config
- Update ExchangeConfigResponse schema with direction-specific fields
- Remove premium_percentage from PriceResponse (now in config)
- Update price endpoint to load pricing config from database
- Update frontend to use direction-specific min/max and calculate premium
- Update tests to seed pricing config
- Add logic to clamp amount when direction changes
2025-12-26 20:20:23 +01:00

399 lines
14 KiB
Python

"""Exchange routes for Bitcoin trading."""
import uuid
from datetime import date
from fastapi import APIRouter, Depends, HTTPException, Query, status
from sqlalchemy.ext.asyncio import AsyncSession
from auth import require_permission
from database import get_db
from mappers import ExchangeMapper
from models import (
BitcoinTransferMethod,
ExchangeStatus,
Permission,
PriceHistory,
TradeDirection,
User,
)
from price_fetcher import PAIR_BTC_EUR, SOURCE_BITFINEX, fetch_btc_eur_price
from repositories.exchange import ExchangeRepository
from repositories.price import PriceRepository
from repositories.pricing import PricingRepository
from schemas import (
AdminExchangeResponse,
AvailableSlotsResponse,
ExchangeConfigResponse,
ExchangePriceResponse,
ExchangeRequest,
ExchangeResponse,
PriceResponse,
UserSearchResult,
)
from services.exchange import ExchangeService
from shared_constants import EUR_TRADE_INCREMENT
from utils.enum_validation import validate_enum
router = APIRouter(prefix="/api/exchange", tags=["exchange"])
# =============================================================================
# Helper functions
# =============================================================================
# =============================================================================
# Price Endpoint
# =============================================================================
@router.get("/price", response_model=ExchangePriceResponse)
async def get_exchange_price(
db: AsyncSession = Depends(get_db),
_current_user: User = Depends(require_permission(Permission.CREATE_EXCHANGE)),
) -> ExchangePriceResponse:
"""
Get the current BTC/EUR price for trading.
Returns the latest price from the database. If no price exists or the price
is stale, attempts to fetch a fresh price from Bitfinex.
The response includes:
- market_price: The raw price from the exchange
- is_stale: Whether the price is older than 5 minutes
- config: Trading configuration (min/max EUR per direction, premiums, increment)
"""
price_repo = PriceRepository(db)
pricing_repo = PricingRepository(db)
service = ExchangeService(db)
# Load pricing config from database
pricing_config = await pricing_repo.get_current()
if pricing_config is None:
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="Pricing configuration not available",
)
config = ExchangeConfigResponse(
eur_min_buy=pricing_config.eur_min_buy,
eur_max_buy=pricing_config.eur_max_buy,
eur_min_sell=pricing_config.eur_min_sell,
eur_max_sell=pricing_config.eur_max_sell,
eur_increment=EUR_TRADE_INCREMENT,
premium_buy=pricing_config.premium_buy,
premium_sell=pricing_config.premium_sell,
small_trade_threshold_eur=pricing_config.small_trade_threshold_eur,
small_trade_extra_premium=pricing_config.small_trade_extra_premium,
)
# Try to get the latest cached price
cached_price = await price_repo.get_latest()
# If no cached price or it's stale, try to fetch a new one
if cached_price is None or service.is_price_stale(cached_price.timestamp):
try:
price_value, timestamp = await fetch_btc_eur_price()
# Store the new price
new_price = PriceHistory(
source=SOURCE_BITFINEX,
pair=PAIR_BTC_EUR,
price=price_value,
timestamp=timestamp,
)
db.add(new_price)
await db.commit()
await db.refresh(new_price)
return ExchangePriceResponse(
price=PriceResponse(
market_price=price_value,
timestamp=timestamp,
is_stale=False,
),
config=config,
)
except Exception as e:
# If fetch fails and we have a cached price, return it with stale flag
if cached_price is not None:
return ExchangePriceResponse(
price=PriceResponse(
market_price=cached_price.price,
timestamp=cached_price.timestamp,
is_stale=True,
),
config=config,
error=f"Failed to fetch fresh price: {e}",
)
# No cached price and fetch failed
return ExchangePriceResponse(
price=None,
config=config,
error=f"Price unavailable: {e}",
)
# Return the cached price (not stale)
return ExchangePriceResponse(
price=PriceResponse(
market_price=cached_price.price,
timestamp=cached_price.timestamp,
is_stale=service.is_price_stale(cached_price.timestamp),
),
config=config,
)
# =============================================================================
# Available Slots Endpoint
# =============================================================================
@router.get("/slots", response_model=AvailableSlotsResponse)
async def get_available_slots(
date_param: date = Query(..., alias="date"),
db: AsyncSession = Depends(get_db),
_current_user: User = Depends(require_permission(Permission.CREATE_EXCHANGE)),
) -> AvailableSlotsResponse:
"""
Get available booking slots for a specific date.
Returns all slots that:
- Fall within admin-defined availability windows
- Are not already booked by another user
"""
service = ExchangeService(db)
return await service.get_available_slots(date_param)
# =============================================================================
# Create Exchange Endpoint
# =============================================================================
@router.post("", response_model=ExchangeResponse)
async def create_exchange(
request: ExchangeRequest,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(require_permission(Permission.CREATE_EXCHANGE)),
) -> ExchangeResponse:
"""
Create a new exchange trade booking.
Validates:
- Slot is on a valid date and time boundary
- Slot is within admin availability
- Slot is not already booked
- Price is not stale
- EUR amount is within configured limits
"""
# Validate direction
direction: TradeDirection = validate_enum(
TradeDirection, request.direction, "direction"
)
# Validate bitcoin transfer method
bitcoin_transfer_method: BitcoinTransferMethod = validate_enum(
BitcoinTransferMethod,
request.bitcoin_transfer_method,
"bitcoin_transfer_method",
)
# Use service to create exchange (handles all validation)
service = ExchangeService(db)
exchange = await service.create_exchange(
user=current_user,
slot_start=request.slot_start,
direction=direction,
bitcoin_transfer_method=bitcoin_transfer_method,
eur_amount=request.eur_amount,
)
return ExchangeMapper.to_response(exchange, current_user.email)
# =============================================================================
# User's Exchanges Endpoints
# =============================================================================
trades_router = APIRouter(prefix="/api/trades", tags=["trades"])
@trades_router.get("", response_model=list[ExchangeResponse])
async def get_my_trades(
db: AsyncSession = Depends(get_db),
current_user: User = Depends(require_permission(Permission.VIEW_OWN_EXCHANGES)),
) -> list[ExchangeResponse]:
"""Get the current user's exchanges, sorted by date (newest first)."""
exchange_repo = ExchangeRepository(db)
exchanges = await exchange_repo.get_by_user_id(current_user.id, order_by_desc=True)
return [ExchangeMapper.to_response(ex, current_user.email) for ex in exchanges]
@trades_router.get("/{public_id}", response_model=ExchangeResponse)
async def get_my_trade(
public_id: uuid.UUID,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(require_permission(Permission.VIEW_OWN_EXCHANGES)),
) -> ExchangeResponse:
"""Get a specific trade by public ID. User can only access their own trades."""
service = ExchangeService(db)
exchange = await service.get_exchange_by_public_id(public_id, user=current_user)
return ExchangeMapper.to_response(exchange, current_user.email)
@trades_router.post("/{public_id}/cancel", response_model=ExchangeResponse)
async def cancel_my_trade(
public_id: uuid.UUID,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(require_permission(Permission.CANCEL_OWN_EXCHANGE)),
) -> ExchangeResponse:
"""Cancel one of the current user's exchanges."""
service = ExchangeService(db)
# Get exchange without user filter first to check ownership separately
exchange = await service.get_exchange_by_public_id(public_id)
# Check ownership - return 403 if user doesn't own it
if exchange.user_id != current_user.id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Cannot cancel another user's trade",
)
exchange = await service.cancel_exchange(exchange, current_user, is_admin=False)
return ExchangeMapper.to_response(exchange, current_user.email)
# =============================================================================
# Admin Exchanges Endpoints
# =============================================================================
admin_trades_router = APIRouter(prefix="/api/admin/trades", tags=["admin-trades"])
@admin_trades_router.get("/upcoming", response_model=list[AdminExchangeResponse])
async def get_upcoming_trades(
db: AsyncSession = Depends(get_db),
_current_user: User = Depends(require_permission(Permission.VIEW_ALL_EXCHANGES)),
) -> list[AdminExchangeResponse]:
"""Get all upcoming booked trades, sorted by slot time ascending."""
exchange_repo = ExchangeRepository(db)
exchanges = await exchange_repo.get_upcoming_booked()
return [ExchangeMapper.to_admin_response(ex) for ex in exchanges]
@admin_trades_router.get("/past", response_model=list[AdminExchangeResponse])
async def get_past_trades(
status: str | None = None,
start_date: date | None = None,
end_date: date | None = None,
user_search: str | None = None,
db: AsyncSession = Depends(get_db),
_current_user: User = Depends(require_permission(Permission.VIEW_ALL_EXCHANGES)),
) -> list[AdminExchangeResponse]:
"""
Get past trades with optional filters.
Filters:
- status: Filter by exchange status
- start_date, end_date: Filter by slot_start date range
- user_search: Search by user email (partial match)
"""
# Apply status filter
status_enum: ExchangeStatus | None = None
if status:
status_enum = validate_enum(ExchangeStatus, status, "status")
# Use repository for query
exchange_repo = ExchangeRepository(db)
exchanges = await exchange_repo.get_past_trades(
status=status_enum,
start_date=start_date,
end_date=end_date,
user_search=user_search,
)
return [ExchangeMapper.to_admin_response(ex) for ex in exchanges]
@admin_trades_router.post("/{public_id}/complete", response_model=AdminExchangeResponse)
async def complete_trade(
public_id: uuid.UUID,
db: AsyncSession = Depends(get_db),
_current_user: User = Depends(require_permission(Permission.COMPLETE_EXCHANGE)),
) -> AdminExchangeResponse:
"""Mark a trade as completed. Only possible after slot time has passed."""
service = ExchangeService(db)
exchange = await service.get_exchange_by_public_id(public_id)
exchange = await service.complete_exchange(exchange)
return ExchangeMapper.to_admin_response(exchange)
@admin_trades_router.post("/{public_id}/no-show", response_model=AdminExchangeResponse)
async def mark_no_show(
public_id: uuid.UUID,
db: AsyncSession = Depends(get_db),
_current_user: User = Depends(require_permission(Permission.COMPLETE_EXCHANGE)),
) -> AdminExchangeResponse:
"""Mark a trade as no-show. Only possible after slot time has passed."""
service = ExchangeService(db)
exchange = await service.get_exchange_by_public_id(public_id)
exchange = await service.mark_no_show(exchange)
return ExchangeMapper.to_admin_response(exchange)
@admin_trades_router.post("/{public_id}/cancel", response_model=AdminExchangeResponse)
async def admin_cancel_trade(
public_id: uuid.UUID,
db: AsyncSession = Depends(get_db),
_current_user: User = Depends(require_permission(Permission.CANCEL_ANY_EXCHANGE)),
) -> AdminExchangeResponse:
"""Cancel any trade (admin only)."""
service = ExchangeService(db)
exchange = await service.get_exchange_by_public_id(public_id)
exchange = await service.cancel_exchange(exchange, _current_user, is_admin=True)
return ExchangeMapper.to_admin_response(exchange)
# =============================================================================
# Admin User Search Endpoint
# =============================================================================
admin_users_router = APIRouter(prefix="/api/admin/users", tags=["admin-users"])
@admin_users_router.get("/search", response_model=list[UserSearchResult])
async def search_users(
q: str = Query(..., min_length=1, description="Search query for user email"),
db: AsyncSession = Depends(get_db),
_current_user: User = Depends(require_permission(Permission.VIEW_ALL_EXCHANGES)),
) -> list[UserSearchResult]:
"""
Search users by email for autocomplete.
Returns users whose email contains the search query (case-insensitive).
Limited to 10 results for autocomplete purposes.
"""
# Note: UserRepository doesn't have search yet, but we can add it
# For now, keeping direct query for this specific use case
from sqlalchemy import select
result = await db.execute(
select(User).where(User.email.ilike(f"%{q}%")).order_by(User.email).limit(10)
)
users = result.scalars().all()
return [UserSearchResult(id=u.id, email=u.email) for u in users]
# All routers from this module for easy registration
routers = [router, trades_router, admin_trades_router, admin_users_router]