arbret/backend/routes/exchange.py

812 lines
26 KiB
Python
Raw Normal View History

"""Exchange routes for Bitcoin trading."""
from datetime import UTC, date, datetime, time, timedelta
from fastapi import APIRouter, Depends, HTTPException, Query
from pydantic import BaseModel
from sqlalchemy import and_, desc, select
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import joinedload
from auth import require_permission
from database import get_db
from date_validation import validate_date_in_range
from models import (
Availability,
Exchange,
ExchangeStatus,
Permission,
PriceHistory,
TradeDirection,
User,
)
from price_fetcher import PAIR_BTC_EUR, SOURCE_BITFINEX, fetch_btc_eur_price
from schemas import (
AdminExchangeResponse,
ExchangeRequest,
ExchangeResponse,
ExchangeUserContact,
)
from shared_constants import (
EUR_TRADE_INCREMENT,
EUR_TRADE_MAX,
EUR_TRADE_MIN,
PREMIUM_PERCENTAGE,
PRICE_STALENESS_SECONDS,
SLOT_DURATION_MINUTES,
)
router = APIRouter(prefix="/api/exchange", tags=["exchange"])
# =============================================================================
# Constants for satoshi calculations
# =============================================================================
SATS_PER_BTC = 100_000_000
# =============================================================================
# Pydantic models for price endpoint
# =============================================================================
class ExchangeConfigResponse(BaseModel):
"""Exchange configuration for the frontend."""
eur_min: int
eur_max: int
eur_increment: int
premium_percentage: int
class PriceResponse(BaseModel):
"""Current BTC/EUR price for trading.
Note: The actual agreed price depends on trade direction (buy/sell)
and is calculated by the frontend using market_price and premium_percentage.
"""
market_price: float # Raw price from exchange
premium_percentage: int
timestamp: datetime
is_stale: bool
class ExchangePriceResponse(BaseModel):
"""Combined price and configuration response."""
price: PriceResponse | None # None if price fetch failed
config: ExchangeConfigResponse
error: str | None = None
class BookableSlot(BaseModel):
"""A single bookable time slot."""
start_time: datetime
end_time: datetime
class AvailableSlotsResponse(BaseModel):
"""Response containing available slots for a date."""
date: date
slots: list[BookableSlot]
# =============================================================================
# Helper functions
# =============================================================================
def apply_premium_for_direction(
market_price: float,
premium_percentage: int,
direction: TradeDirection,
) -> float:
"""
Apply premium to market price based on trade direction.
The premium is always favorable to the admin:
- When user BUYS BTC: user pays MORE (market * (1 + premium/100))
- When user SELLS BTC: user receives LESS (market * (1 - premium/100))
"""
if direction == TradeDirection.BUY:
return market_price * (1 + premium_percentage / 100)
else: # SELL
return market_price * (1 - premium_percentage / 100)
def calculate_sats_amount(
eur_cents: int,
price_eur_per_btc: float,
) -> int:
"""
Calculate satoshi amount from EUR cents and price.
Args:
eur_cents: Amount in EUR cents (e.g., 10000 = 100)
price_eur_per_btc: Price in EUR per BTC
Returns:
Amount in satoshis
"""
eur_amount = eur_cents / 100
btc_amount = eur_amount / price_eur_per_btc
return int(btc_amount * SATS_PER_BTC)
async def get_latest_price(db: AsyncSession) -> PriceHistory | None:
"""Get the most recent price from the database."""
query = (
select(PriceHistory)
.where(
PriceHistory.source == SOURCE_BITFINEX, PriceHistory.pair == PAIR_BTC_EUR
)
.order_by(desc(PriceHistory.timestamp))
.limit(1)
)
result = await db.execute(query)
return result.scalar_one_or_none()
def is_price_stale(price_timestamp: datetime) -> bool:
"""Check if a price is older than the staleness threshold."""
age_seconds = (datetime.now(UTC) - price_timestamp).total_seconds()
return age_seconds > PRICE_STALENESS_SECONDS
def _to_exchange_response(
exchange: Exchange,
user_email: str | None = None,
) -> ExchangeResponse:
"""Convert an Exchange model to ExchangeResponse schema."""
email = user_email if user_email is not None else exchange.user.email
return ExchangeResponse(
id=exchange.id,
user_id=exchange.user_id,
user_email=email,
slot_start=exchange.slot_start,
slot_end=exchange.slot_end,
direction=exchange.direction.value,
eur_amount=exchange.eur_amount,
sats_amount=exchange.sats_amount,
market_price_eur=exchange.market_price_eur,
agreed_price_eur=exchange.agreed_price_eur,
premium_percentage=exchange.premium_percentage,
status=exchange.status.value,
created_at=exchange.created_at,
cancelled_at=exchange.cancelled_at,
completed_at=exchange.completed_at,
)
# =============================================================================
# Price Endpoint
# =============================================================================
@router.get("/price", response_model=ExchangePriceResponse)
async def get_exchange_price(
db: AsyncSession = Depends(get_db),
_current_user: User = Depends(require_permission(Permission.CREATE_EXCHANGE)),
) -> ExchangePriceResponse:
"""
Get the current BTC/EUR price for trading.
Returns the latest price from the database. If no price exists or the price
is stale, attempts to fetch a fresh price from Bitfinex.
The response includes:
- market_price: The raw price from the exchange
- premium_percentage: The premium to apply to trades
- is_stale: Whether the price is older than 5 minutes
- config: Trading configuration (min/max EUR, increment)
"""
config = ExchangeConfigResponse(
eur_min=EUR_TRADE_MIN,
eur_max=EUR_TRADE_MAX,
eur_increment=EUR_TRADE_INCREMENT,
premium_percentage=PREMIUM_PERCENTAGE,
)
# Try to get the latest cached price
cached_price = await get_latest_price(db)
# If no cached price or it's stale, try to fetch a new one
if cached_price is None or is_price_stale(cached_price.timestamp):
try:
price_value, timestamp = await fetch_btc_eur_price()
# Store the new price
new_price = PriceHistory(
source=SOURCE_BITFINEX,
pair=PAIR_BTC_EUR,
price=price_value,
timestamp=timestamp,
)
db.add(new_price)
await db.commit()
await db.refresh(new_price)
return ExchangePriceResponse(
price=PriceResponse(
market_price=price_value,
premium_percentage=PREMIUM_PERCENTAGE,
timestamp=timestamp,
is_stale=False,
),
config=config,
)
except Exception as e:
# If fetch fails and we have a cached price, return it with stale flag
if cached_price is not None:
return ExchangePriceResponse(
price=PriceResponse(
market_price=cached_price.price,
premium_percentage=PREMIUM_PERCENTAGE,
timestamp=cached_price.timestamp,
is_stale=True,
),
config=config,
error=f"Failed to fetch fresh price: {e}",
)
# No cached price and fetch failed
return ExchangePriceResponse(
price=None,
config=config,
error=f"Price unavailable: {e}",
)
# Return the cached price (not stale)
return ExchangePriceResponse(
price=PriceResponse(
market_price=cached_price.price,
premium_percentage=PREMIUM_PERCENTAGE,
timestamp=cached_price.timestamp,
is_stale=is_price_stale(cached_price.timestamp),
),
config=config,
)
# =============================================================================
# Available Slots Endpoint
# =============================================================================
def _expand_availability_to_slots(
avail: Availability, slot_date: date, booked_starts: set[datetime]
) -> list[BookableSlot]:
"""
Expand an availability block into individual slots, filtering out booked ones.
"""
slots: list[BookableSlot] = []
# Start from the availability's start time
current_start = datetime.combine(slot_date, avail.start_time, tzinfo=UTC)
avail_end = datetime.combine(slot_date, avail.end_time, tzinfo=UTC)
while current_start + timedelta(minutes=SLOT_DURATION_MINUTES) <= avail_end:
slot_end = current_start + timedelta(minutes=SLOT_DURATION_MINUTES)
# Only include if not already booked
if current_start not in booked_starts:
slots.append(BookableSlot(start_time=current_start, end_time=slot_end))
current_start = slot_end
return slots
@router.get("/slots", response_model=AvailableSlotsResponse)
async def get_available_slots(
date_param: date = Query(..., alias="date"),
db: AsyncSession = Depends(get_db),
_current_user: User = Depends(require_permission(Permission.CREATE_EXCHANGE)),
) -> AvailableSlotsResponse:
"""
Get available booking slots for a specific date.
Returns all slots that:
- Fall within admin-defined availability windows
- Are not already booked by another user
"""
validate_date_in_range(date_param, context="book")
# Get availability for the date
result = await db.execute(
select(Availability).where(Availability.date == date_param)
)
availabilities = result.scalars().all()
if not availabilities:
return AvailableSlotsResponse(date=date_param, slots=[])
# Get already booked slots for the date
date_start = datetime.combine(date_param, time.min, tzinfo=UTC)
date_end = datetime.combine(date_param, time.max, tzinfo=UTC)
result = await db.execute(
select(Exchange.slot_start).where(
and_(
Exchange.slot_start >= date_start,
Exchange.slot_start <= date_end,
Exchange.status == ExchangeStatus.BOOKED,
)
)
)
booked_starts = {row[0] for row in result.all()}
# Expand each availability into slots
all_slots: list[BookableSlot] = []
for avail in availabilities:
slots = _expand_availability_to_slots(avail, date_param, booked_starts)
all_slots.extend(slots)
# Sort by start time
all_slots.sort(key=lambda s: s.start_time)
return AvailableSlotsResponse(date=date_param, slots=all_slots)
# =============================================================================
# Create Exchange Endpoint
# =============================================================================
@router.post("", response_model=ExchangeResponse)
async def create_exchange(
request: ExchangeRequest,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(require_permission(Permission.CREATE_EXCHANGE)),
) -> ExchangeResponse:
"""
Create a new exchange trade booking.
Validates:
- Slot is on a valid date and time boundary
- Slot is within admin availability
- Slot is not already booked
- Price is not stale
- EUR amount is within configured limits
"""
slot_date = request.slot_start.date()
validate_date_in_range(slot_date, context="book")
# Validate direction
try:
direction = TradeDirection(request.direction)
except ValueError:
raise HTTPException(
status_code=400,
detail=f"Invalid direction: {request.direction}. Must be 'buy' or 'sell'.",
) from None
# Validate EUR amount
if request.eur_amount < EUR_TRADE_MIN * 100:
raise HTTPException(
status_code=400,
detail=f"EUR amount must be at least €{EUR_TRADE_MIN}",
)
if request.eur_amount > EUR_TRADE_MAX * 100:
raise HTTPException(
status_code=400,
detail=f"EUR amount must be at most €{EUR_TRADE_MAX}",
)
if request.eur_amount % (EUR_TRADE_INCREMENT * 100) != 0:
raise HTTPException(
status_code=400,
detail=f"EUR amount must be a multiple of €{EUR_TRADE_INCREMENT}",
)
# Validate slot timing
valid_minutes = (0, 15, 30, 45)
if request.slot_start.minute not in valid_minutes:
raise HTTPException(
status_code=400,
detail=f"Slot must be on {SLOT_DURATION_MINUTES}-minute boundary",
)
if request.slot_start.second != 0 or request.slot_start.microsecond != 0:
raise HTTPException(
status_code=400,
detail="Slot start time must not have seconds or microseconds",
)
# Verify slot falls within availability
slot_start_time = request.slot_start.time()
slot_end_dt = request.slot_start + timedelta(minutes=SLOT_DURATION_MINUTES)
slot_end_time = slot_end_dt.time()
result = await db.execute(
select(Availability).where(
and_(
Availability.date == slot_date,
Availability.start_time <= slot_start_time,
Availability.end_time >= slot_end_time,
)
)
)
matching_availability = result.scalar_one_or_none()
if not matching_availability:
slot_str = request.slot_start.strftime("%Y-%m-%d %H:%M")
raise HTTPException(
status_code=400,
detail=f"Selected slot at {slot_str} UTC is not available",
)
# Get and validate price
cached_price = await get_latest_price(db)
if cached_price is None:
raise HTTPException(
status_code=503,
detail="Price data unavailable. Please try again later.",
)
if is_price_stale(cached_price.timestamp):
raise HTTPException(
status_code=503,
detail="Price is stale. Please refresh and try again.",
)
# Calculate agreed price based on direction
market_price = cached_price.price
agreed_price = apply_premium_for_direction(
market_price, PREMIUM_PERCENTAGE, direction
)
# Calculate sats amount based on agreed price
sats_amount = calculate_sats_amount(request.eur_amount, agreed_price)
# Create the exchange
exchange = Exchange(
user_id=current_user.id,
slot_start=request.slot_start,
slot_end=slot_end_dt,
direction=direction,
eur_amount=request.eur_amount,
sats_amount=sats_amount,
market_price_eur=market_price,
agreed_price_eur=agreed_price,
premium_percentage=PREMIUM_PERCENTAGE,
status=ExchangeStatus.BOOKED,
)
db.add(exchange)
try:
await db.commit()
await db.refresh(exchange)
except IntegrityError:
await db.rollback()
raise HTTPException(
status_code=409,
detail="This slot has already been booked. Select another slot.",
) from None
return _to_exchange_response(exchange, current_user.email)
# =============================================================================
# User's Exchanges Endpoints
# =============================================================================
trades_router = APIRouter(prefix="/api/trades", tags=["trades"])
@trades_router.get("", response_model=list[ExchangeResponse])
async def get_my_trades(
db: AsyncSession = Depends(get_db),
current_user: User = Depends(require_permission(Permission.VIEW_OWN_EXCHANGES)),
) -> list[ExchangeResponse]:
"""Get the current user's exchanges, sorted by date (newest first)."""
result = await db.execute(
select(Exchange)
.where(Exchange.user_id == current_user.id)
.order_by(Exchange.slot_start.desc())
)
exchanges = result.scalars().all()
return [_to_exchange_response(ex, current_user.email) for ex in exchanges]
@trades_router.post("/{exchange_id}/cancel", response_model=ExchangeResponse)
async def cancel_my_trade(
exchange_id: int,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(require_permission(Permission.CANCEL_OWN_EXCHANGE)),
) -> ExchangeResponse:
"""Cancel one of the current user's exchanges."""
# Get the exchange with eager loading of user relationship
result = await db.execute(
select(Exchange)
.options(joinedload(Exchange.user))
.where(Exchange.id == exchange_id)
)
exchange = result.scalar_one_or_none()
if not exchange:
raise HTTPException(
status_code=404,
detail=f"Trade {exchange_id} not found",
)
# Verify ownership
if exchange.user_id != current_user.id:
raise HTTPException(
status_code=403,
detail="Cannot cancel another user's trade",
)
# Check if already in a final state
if exchange.status != ExchangeStatus.BOOKED:
raise HTTPException(
status_code=400,
detail=f"Cannot cancel: status is '{exchange.status.value}'",
)
# Cancel the exchange (no time restriction per spec)
exchange.status = ExchangeStatus.CANCELLED_BY_USER
exchange.cancelled_at = datetime.now(UTC)
await db.commit()
await db.refresh(exchange)
return _to_exchange_response(exchange, current_user.email)
# =============================================================================
# Admin Exchanges Endpoints
# =============================================================================
admin_trades_router = APIRouter(prefix="/api/admin/trades", tags=["admin-trades"])
def _to_admin_exchange_response(exchange: Exchange) -> AdminExchangeResponse:
"""Convert an Exchange model to AdminExchangeResponse with user contact."""
user = exchange.user
return AdminExchangeResponse(
id=exchange.id,
user_id=exchange.user_id,
user_email=user.email,
user_contact=ExchangeUserContact(
email=user.email,
contact_email=user.contact_email,
telegram=user.telegram,
signal=user.signal,
nostr_npub=user.nostr_npub,
),
slot_start=exchange.slot_start,
slot_end=exchange.slot_end,
direction=exchange.direction.value,
eur_amount=exchange.eur_amount,
sats_amount=exchange.sats_amount,
market_price_eur=exchange.market_price_eur,
agreed_price_eur=exchange.agreed_price_eur,
premium_percentage=exchange.premium_percentage,
status=exchange.status.value,
created_at=exchange.created_at,
cancelled_at=exchange.cancelled_at,
completed_at=exchange.completed_at,
)
@admin_trades_router.get("/upcoming", response_model=list[AdminExchangeResponse])
async def get_upcoming_trades(
db: AsyncSession = Depends(get_db),
_current_user: User = Depends(require_permission(Permission.VIEW_ALL_EXCHANGES)),
) -> list[AdminExchangeResponse]:
"""Get all upcoming booked trades, sorted by slot time ascending."""
now = datetime.now(UTC)
result = await db.execute(
select(Exchange)
.options(joinedload(Exchange.user))
.where(
and_(
Exchange.slot_start > now,
Exchange.status == ExchangeStatus.BOOKED,
)
)
.order_by(Exchange.slot_start.asc())
)
exchanges = result.scalars().all()
return [_to_admin_exchange_response(ex) for ex in exchanges]
@admin_trades_router.get("/past", response_model=list[AdminExchangeResponse])
async def get_past_trades(
status: str | None = None,
start_date: date | None = None,
end_date: date | None = None,
user_search: str | None = None,
db: AsyncSession = Depends(get_db),
_current_user: User = Depends(require_permission(Permission.VIEW_ALL_EXCHANGES)),
) -> list[AdminExchangeResponse]:
"""
Get past trades with optional filters.
Filters:
- status: Filter by exchange status
- start_date, end_date: Filter by slot_start date range
- user_search: Search by user email (partial match)
"""
now = datetime.now(UTC)
# Start with base query for past trades (slot_start <= now OR not booked)
query = (
select(Exchange)
.options(joinedload(Exchange.user))
.where(
(Exchange.slot_start <= now) | (Exchange.status != ExchangeStatus.BOOKED)
)
)
# Apply status filter
if status:
try:
status_enum = ExchangeStatus(status)
query = query.where(Exchange.status == status_enum)
except ValueError:
raise HTTPException(
status_code=400,
detail=f"Invalid status: {status}",
) from None
# Apply date range filter
if start_date:
start_dt = datetime.combine(start_date, time.min, tzinfo=UTC)
query = query.where(Exchange.slot_start >= start_dt)
if end_date:
end_dt = datetime.combine(end_date, time.max, tzinfo=UTC)
query = query.where(Exchange.slot_start <= end_dt)
# Apply user search filter (join with User table)
if user_search:
query = query.join(Exchange.user).where(User.email.ilike(f"%{user_search}%"))
# Order by most recent first
query = query.order_by(Exchange.slot_start.desc())
result = await db.execute(query)
exchanges = result.scalars().all()
return [_to_admin_exchange_response(ex) for ex in exchanges]
@admin_trades_router.post(
"/{exchange_id}/complete", response_model=AdminExchangeResponse
)
async def complete_trade(
exchange_id: int,
db: AsyncSession = Depends(get_db),
_current_user: User = Depends(require_permission(Permission.CANCEL_ANY_EXCHANGE)),
) -> AdminExchangeResponse:
"""Mark a trade as completed. Only possible after slot time has passed."""
result = await db.execute(
select(Exchange)
.options(joinedload(Exchange.user))
.where(Exchange.id == exchange_id)
)
exchange = result.scalar_one_or_none()
if not exchange:
raise HTTPException(
status_code=404,
detail=f"Trade {exchange_id} not found",
)
# Check slot has passed
if exchange.slot_start > datetime.now(UTC):
raise HTTPException(
status_code=400,
detail="Cannot complete: trade slot has not yet started",
)
# Check status is BOOKED
if exchange.status != ExchangeStatus.BOOKED:
raise HTTPException(
status_code=400,
detail=f"Cannot complete: status is '{exchange.status.value}'",
)
exchange.status = ExchangeStatus.COMPLETED
exchange.completed_at = datetime.now(UTC)
await db.commit()
await db.refresh(exchange)
return _to_admin_exchange_response(exchange)
@admin_trades_router.post(
"/{exchange_id}/no-show", response_model=AdminExchangeResponse
)
async def mark_no_show(
exchange_id: int,
db: AsyncSession = Depends(get_db),
_current_user: User = Depends(require_permission(Permission.CANCEL_ANY_EXCHANGE)),
) -> AdminExchangeResponse:
"""Mark a trade as no-show. Only possible after slot time has passed."""
result = await db.execute(
select(Exchange)
.options(joinedload(Exchange.user))
.where(Exchange.id == exchange_id)
)
exchange = result.scalar_one_or_none()
if not exchange:
raise HTTPException(
status_code=404,
detail=f"Trade {exchange_id} not found",
)
# Check slot has passed
if exchange.slot_start > datetime.now(UTC):
raise HTTPException(
status_code=400,
detail="Cannot mark as no-show: trade slot has not yet started",
)
# Check status is BOOKED
if exchange.status != ExchangeStatus.BOOKED:
raise HTTPException(
status_code=400,
detail=f"Cannot mark as no-show: status is '{exchange.status.value}'",
)
exchange.status = ExchangeStatus.NO_SHOW
exchange.completed_at = datetime.now(UTC)
await db.commit()
await db.refresh(exchange)
return _to_admin_exchange_response(exchange)
@admin_trades_router.post("/{exchange_id}/cancel", response_model=AdminExchangeResponse)
async def admin_cancel_trade(
exchange_id: int,
db: AsyncSession = Depends(get_db),
_current_user: User = Depends(require_permission(Permission.CANCEL_ANY_EXCHANGE)),
) -> AdminExchangeResponse:
"""Cancel any trade (admin only)."""
result = await db.execute(
select(Exchange)
.options(joinedload(Exchange.user))
.where(Exchange.id == exchange_id)
)
exchange = result.scalar_one_or_none()
if not exchange:
raise HTTPException(
status_code=404,
detail=f"Trade {exchange_id} not found",
)
# Check status is BOOKED
if exchange.status != ExchangeStatus.BOOKED:
raise HTTPException(
status_code=400,
detail=f"Cannot cancel: status is '{exchange.status.value}'",
)
exchange.status = ExchangeStatus.CANCELLED_BY_ADMIN
exchange.cancelled_at = datetime.now(UTC)
await db.commit()
await db.refresh(exchange)
return _to_admin_exchange_response(exchange)
# All routers from this module for easy registration
routers = [router, trades_router, admin_trades_router]