arbret/backend/routes/exchange.py
counterweight ce9159c5b0
Phase 2.1-2.3: Add exchange endpoints
Add exchange trading endpoints:
- POST /api/exchange: Create exchange trade
  - Validates slot, price staleness, EUR amount limits
  - Calculates sats from EUR and agreed price
  - Direction-specific premium (buy=+5%, sell=-5%)

- GET /api/trades: List user's exchanges
- POST /api/trades/{id}/cancel: Cancel user's exchange

Add schemas:
- ExchangeRequest, ExchangeResponse
- ExchangeUserContact, AdminExchangeResponse (for Phase 2.4)
- PaginatedExchanges, PaginatedAdminExchanges
2025-12-22 18:28:56 +01:00

472 lines
15 KiB
Python

"""Exchange routes for Bitcoin trading."""
from datetime import UTC, datetime, timedelta
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel
from sqlalchemy import and_, desc, select
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import joinedload
from auth import require_permission
from database import get_db
from date_validation import validate_date_in_range
from models import (
Availability,
Exchange,
ExchangeStatus,
Permission,
PriceHistory,
TradeDirection,
User,
)
from price_fetcher import PAIR_BTC_EUR, SOURCE_BITFINEX, fetch_btc_eur_price
from schemas import ExchangeRequest, ExchangeResponse
from shared_constants import (
EUR_TRADE_INCREMENT,
EUR_TRADE_MAX,
EUR_TRADE_MIN,
PREMIUM_PERCENTAGE,
PRICE_STALENESS_SECONDS,
SLOT_DURATION_MINUTES,
)
router = APIRouter(prefix="/api/exchange", tags=["exchange"])
# =============================================================================
# Constants for satoshi calculations
# =============================================================================
SATS_PER_BTC = 100_000_000
# =============================================================================
# Pydantic models for price endpoint
# =============================================================================
class ExchangeConfigResponse(BaseModel):
"""Exchange configuration for the frontend."""
eur_min: int
eur_max: int
eur_increment: int
premium_percentage: int
class PriceResponse(BaseModel):
"""Current BTC/EUR price with premium applied."""
market_price: float # Raw price from exchange
agreed_price: float # Price with premium applied
premium_percentage: int
timestamp: datetime
is_stale: bool
class ExchangePriceResponse(BaseModel):
"""Combined price and configuration response."""
price: PriceResponse | None # None if price fetch failed
config: ExchangeConfigResponse
error: str | None = None
# =============================================================================
# Helper functions
# =============================================================================
def apply_premium_for_direction(
market_price: float,
premium_percentage: int,
direction: TradeDirection,
) -> float:
"""
Apply premium to market price based on trade direction.
The premium is always favorable to the admin:
- When user BUYS BTC: user pays MORE (market * (1 + premium/100))
- When user SELLS BTC: user receives LESS (market * (1 - premium/100))
"""
if direction == TradeDirection.BUY:
return market_price * (1 + premium_percentage / 100)
else: # SELL
return market_price * (1 - premium_percentage / 100)
def apply_premium(market_price: float, premium_percentage: int) -> float:
"""Apply buy-side premium (for price display)."""
return apply_premium_for_direction(
market_price, premium_percentage, TradeDirection.BUY
)
def calculate_sats_amount(
eur_cents: int,
price_eur_per_btc: float,
) -> int:
"""
Calculate satoshi amount from EUR cents and price.
Args:
eur_cents: Amount in EUR cents (e.g., 10000 = €100)
price_eur_per_btc: Price in EUR per BTC
Returns:
Amount in satoshis
"""
eur_amount = eur_cents / 100
btc_amount = eur_amount / price_eur_per_btc
return int(btc_amount * SATS_PER_BTC)
async def get_latest_price(db: AsyncSession) -> PriceHistory | None:
"""Get the most recent price from the database."""
query = (
select(PriceHistory)
.where(
PriceHistory.source == SOURCE_BITFINEX, PriceHistory.pair == PAIR_BTC_EUR
)
.order_by(desc(PriceHistory.timestamp))
.limit(1)
)
result = await db.execute(query)
return result.scalar_one_or_none()
def is_price_stale(price_timestamp: datetime) -> bool:
"""Check if a price is older than the staleness threshold."""
age_seconds = (datetime.now(UTC) - price_timestamp).total_seconds()
return age_seconds > PRICE_STALENESS_SECONDS
def _to_exchange_response(
exchange: Exchange,
user_email: str | None = None,
) -> ExchangeResponse:
"""Convert an Exchange model to ExchangeResponse schema."""
email = user_email if user_email is not None else exchange.user.email
return ExchangeResponse(
id=exchange.id,
user_id=exchange.user_id,
user_email=email,
slot_start=exchange.slot_start,
slot_end=exchange.slot_end,
direction=exchange.direction.value,
eur_amount=exchange.eur_amount,
sats_amount=exchange.sats_amount,
market_price_eur=exchange.market_price_eur,
agreed_price_eur=exchange.agreed_price_eur,
premium_percentage=exchange.premium_percentage,
status=exchange.status.value,
created_at=exchange.created_at,
cancelled_at=exchange.cancelled_at,
completed_at=exchange.completed_at,
)
# =============================================================================
# Price Endpoint
# =============================================================================
@router.get("/price", response_model=ExchangePriceResponse)
async def get_exchange_price(
db: AsyncSession = Depends(get_db),
_current_user: User = Depends(require_permission(Permission.BOOK_APPOINTMENT)),
) -> ExchangePriceResponse:
"""
Get the current BTC/EUR price for trading.
Returns the latest price from the database. If no price exists or the price
is stale, attempts to fetch a fresh price from Bitfinex.
The response includes:
- market_price: The raw price from the exchange
- agreed_price: The price with admin premium applied
- is_stale: Whether the price is older than 5 minutes
- config: Trading configuration (min/max EUR, increment)
"""
config = ExchangeConfigResponse(
eur_min=EUR_TRADE_MIN,
eur_max=EUR_TRADE_MAX,
eur_increment=EUR_TRADE_INCREMENT,
premium_percentage=PREMIUM_PERCENTAGE,
)
# Try to get the latest cached price
cached_price = await get_latest_price(db)
# If no cached price or it's stale, try to fetch a new one
if cached_price is None or is_price_stale(cached_price.timestamp):
try:
price_value, timestamp = await fetch_btc_eur_price()
# Store the new price
new_price = PriceHistory(
source=SOURCE_BITFINEX,
pair=PAIR_BTC_EUR,
price=price_value,
timestamp=timestamp,
)
db.add(new_price)
await db.commit()
await db.refresh(new_price)
return ExchangePriceResponse(
price=PriceResponse(
market_price=price_value,
agreed_price=apply_premium(price_value, PREMIUM_PERCENTAGE),
premium_percentage=PREMIUM_PERCENTAGE,
timestamp=timestamp,
is_stale=False,
),
config=config,
)
except Exception as e:
# If fetch fails and we have a cached price, return it with stale flag
if cached_price is not None:
return ExchangePriceResponse(
price=PriceResponse(
market_price=cached_price.price,
agreed_price=apply_premium(
cached_price.price, PREMIUM_PERCENTAGE
),
premium_percentage=PREMIUM_PERCENTAGE,
timestamp=cached_price.timestamp,
is_stale=True,
),
config=config,
error=f"Failed to fetch fresh price: {e}",
)
# No cached price and fetch failed
return ExchangePriceResponse(
price=None,
config=config,
error=f"Price unavailable: {e}",
)
# Return the cached price (not stale)
return ExchangePriceResponse(
price=PriceResponse(
market_price=cached_price.price,
agreed_price=apply_premium(cached_price.price, PREMIUM_PERCENTAGE),
premium_percentage=PREMIUM_PERCENTAGE,
timestamp=cached_price.timestamp,
is_stale=is_price_stale(cached_price.timestamp),
),
config=config,
)
# =============================================================================
# Create Exchange Endpoint
# =============================================================================
@router.post("", response_model=ExchangeResponse)
async def create_exchange(
request: ExchangeRequest,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(require_permission(Permission.BOOK_APPOINTMENT)),
) -> ExchangeResponse:
"""
Create a new exchange trade booking.
Validates:
- Slot is on a valid date and time boundary
- Slot is within admin availability
- Slot is not already booked
- Price is not stale
- EUR amount is within configured limits
"""
slot_date = request.slot_start.date()
validate_date_in_range(slot_date, context="book")
# Validate direction
try:
direction = TradeDirection(request.direction)
except ValueError:
raise HTTPException(
status_code=400,
detail=f"Invalid direction: {request.direction}. Must be 'buy' or 'sell'.",
) from None
# Validate EUR amount
if request.eur_amount < EUR_TRADE_MIN * 100:
raise HTTPException(
status_code=400,
detail=f"EUR amount must be at least €{EUR_TRADE_MIN}",
)
if request.eur_amount > EUR_TRADE_MAX * 100:
raise HTTPException(
status_code=400,
detail=f"EUR amount must be at most €{EUR_TRADE_MAX}",
)
if request.eur_amount % (EUR_TRADE_INCREMENT * 100) != 0:
raise HTTPException(
status_code=400,
detail=f"EUR amount must be a multiple of €{EUR_TRADE_INCREMENT}",
)
# Validate slot timing
valid_minutes = (0, 15, 30, 45)
if request.slot_start.minute not in valid_minutes:
raise HTTPException(
status_code=400,
detail=f"Slot must be on {SLOT_DURATION_MINUTES}-minute boundary",
)
if request.slot_start.second != 0 or request.slot_start.microsecond != 0:
raise HTTPException(
status_code=400,
detail="Slot start time must not have seconds or microseconds",
)
# Verify slot falls within availability
slot_start_time = request.slot_start.time()
slot_end_dt = request.slot_start + timedelta(minutes=SLOT_DURATION_MINUTES)
slot_end_time = slot_end_dt.time()
result = await db.execute(
select(Availability).where(
and_(
Availability.date == slot_date,
Availability.start_time <= slot_start_time,
Availability.end_time >= slot_end_time,
)
)
)
matching_availability = result.scalar_one_or_none()
if not matching_availability:
slot_str = request.slot_start.strftime("%Y-%m-%d %H:%M")
raise HTTPException(
status_code=400,
detail=f"Selected slot at {slot_str} UTC is not available",
)
# Get and validate price
cached_price = await get_latest_price(db)
if cached_price is None:
raise HTTPException(
status_code=503,
detail="Price data unavailable. Please try again later.",
)
if is_price_stale(cached_price.timestamp):
raise HTTPException(
status_code=503,
detail="Price is stale. Please refresh and try again.",
)
# Calculate agreed price based on direction
market_price = cached_price.price
agreed_price = apply_premium_for_direction(
market_price, PREMIUM_PERCENTAGE, direction
)
# Calculate sats amount based on agreed price
sats_amount = calculate_sats_amount(request.eur_amount, agreed_price)
# Create the exchange
exchange = Exchange(
user_id=current_user.id,
slot_start=request.slot_start,
slot_end=slot_end_dt,
direction=direction,
eur_amount=request.eur_amount,
sats_amount=sats_amount,
market_price_eur=market_price,
agreed_price_eur=agreed_price,
premium_percentage=PREMIUM_PERCENTAGE,
status=ExchangeStatus.BOOKED,
)
db.add(exchange)
try:
await db.commit()
await db.refresh(exchange)
except IntegrityError:
await db.rollback()
raise HTTPException(
status_code=409,
detail="This slot has already been booked. Select another slot.",
) from None
return _to_exchange_response(exchange, current_user.email)
# =============================================================================
# User's Exchanges Endpoints
# =============================================================================
trades_router = APIRouter(prefix="/api/trades", tags=["trades"])
@trades_router.get("", response_model=list[ExchangeResponse])
async def get_my_trades(
db: AsyncSession = Depends(get_db),
current_user: User = Depends(require_permission(Permission.VIEW_OWN_APPOINTMENTS)),
) -> list[ExchangeResponse]:
"""Get the current user's exchanges, sorted by date (newest first)."""
result = await db.execute(
select(Exchange)
.where(Exchange.user_id == current_user.id)
.order_by(Exchange.slot_start.desc())
)
exchanges = result.scalars().all()
return [_to_exchange_response(ex, current_user.email) for ex in exchanges]
@trades_router.post("/{exchange_id}/cancel", response_model=ExchangeResponse)
async def cancel_my_trade(
exchange_id: int,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(require_permission(Permission.CANCEL_OWN_APPOINTMENT)),
) -> ExchangeResponse:
"""Cancel one of the current user's exchanges."""
# Get the exchange with eager loading of user relationship
result = await db.execute(
select(Exchange)
.options(joinedload(Exchange.user))
.where(Exchange.id == exchange_id)
)
exchange = result.scalar_one_or_none()
if not exchange:
raise HTTPException(
status_code=404,
detail=f"Trade {exchange_id} not found",
)
# Verify ownership
if exchange.user_id != current_user.id:
raise HTTPException(
status_code=403,
detail="Cannot cancel another user's trade",
)
# Check if already in a final state
if exchange.status != ExchangeStatus.BOOKED:
raise HTTPException(
status_code=400,
detail=f"Cannot cancel: status is '{exchange.status.value}'",
)
# Cancel the exchange (no time restriction per spec)
exchange.status = ExchangeStatus.CANCELLED_BY_USER
exchange.cancelled_at = datetime.now(UTC)
await db.commit()
await db.refresh(exchange)
return _to_exchange_response(exchange, current_user.email)
# All routers from this module for easy registration
routers = [router, trades_router]