arbret/backend/routes/exchange.py

950 lines
30 KiB
Python
Raw Normal View History

"""Exchange routes for Bitcoin trading."""
2025-12-23 17:03:51 +01:00
import uuid
from datetime import UTC, date, datetime, time, timedelta
from fastapi import APIRouter, Depends, HTTPException, Query
from pydantic import BaseModel
from sqlalchemy import and_, desc, select
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import joinedload
from auth import require_permission
from database import get_db
from date_validation import validate_date_in_range
from models import (
Availability,
BitcoinTransferMethod,
Exchange,
ExchangeStatus,
Permission,
PriceHistory,
TradeDirection,
User,
)
from price_fetcher import PAIR_BTC_EUR, SOURCE_BITFINEX, fetch_btc_eur_price
from schemas import (
AdminExchangeResponse,
ExchangeRequest,
ExchangeResponse,
ExchangeUserContact,
)
from shared_constants import (
EUR_TRADE_INCREMENT,
EUR_TRADE_MAX,
EUR_TRADE_MIN,
LIGHTNING_MAX_EUR,
PREMIUM_PERCENTAGE,
PRICE_STALENESS_SECONDS,
SLOT_DURATION_MINUTES,
)
router = APIRouter(prefix="/api/exchange", tags=["exchange"])
# =============================================================================
# Constants for satoshi calculations
# =============================================================================
SATS_PER_BTC = 100_000_000
# =============================================================================
# Pydantic models for price endpoint
# =============================================================================
class ExchangeConfigResponse(BaseModel):
"""Exchange configuration for the frontend."""
eur_min: int
eur_max: int
eur_increment: int
premium_percentage: int
class PriceResponse(BaseModel):
"""Current BTC/EUR price for trading.
Note: The actual agreed price depends on trade direction (buy/sell)
and is calculated by the frontend using market_price and premium_percentage.
"""
market_price: float # Raw price from exchange
premium_percentage: int
timestamp: datetime
is_stale: bool
class ExchangePriceResponse(BaseModel):
"""Combined price and configuration response."""
price: PriceResponse | None # None if price fetch failed
config: ExchangeConfigResponse
error: str | None = None
class BookableSlot(BaseModel):
"""A single bookable time slot."""
start_time: datetime
end_time: datetime
class AvailableSlotsResponse(BaseModel):
"""Response containing available slots for a date."""
date: date
slots: list[BookableSlot]
# =============================================================================
# Helper functions
# =============================================================================
def apply_premium_for_direction(
market_price: float,
premium_percentage: int,
direction: TradeDirection,
) -> float:
"""
Apply premium to market price based on trade direction.
The premium is always favorable to the admin:
- When user BUYS BTC: user pays MORE (market * (1 + premium/100))
- When user SELLS BTC: user receives LESS (market * (1 - premium/100))
"""
if direction == TradeDirection.BUY:
return market_price * (1 + premium_percentage / 100)
else: # SELL
return market_price * (1 - premium_percentage / 100)
def calculate_sats_amount(
eur_cents: int,
price_eur_per_btc: float,
) -> int:
"""
Calculate satoshi amount from EUR cents and price.
Args:
eur_cents: Amount in EUR cents (e.g., 10000 = 100)
price_eur_per_btc: Price in EUR per BTC
Returns:
Amount in satoshis
"""
eur_amount = eur_cents / 100
btc_amount = eur_amount / price_eur_per_btc
return int(btc_amount * SATS_PER_BTC)
async def get_latest_price(db: AsyncSession) -> PriceHistory | None:
"""Get the most recent price from the database."""
query = (
select(PriceHistory)
.where(
PriceHistory.source == SOURCE_BITFINEX, PriceHistory.pair == PAIR_BTC_EUR
)
.order_by(desc(PriceHistory.timestamp))
.limit(1)
)
result = await db.execute(query)
return result.scalar_one_or_none()
def is_price_stale(price_timestamp: datetime) -> bool:
"""Check if a price is older than the staleness threshold."""
age_seconds = (datetime.now(UTC) - price_timestamp).total_seconds()
return age_seconds > PRICE_STALENESS_SECONDS
def _to_exchange_response(
exchange: Exchange,
user_email: str | None = None,
) -> ExchangeResponse:
"""Convert an Exchange model to ExchangeResponse schema."""
email = user_email if user_email is not None else exchange.user.email
return ExchangeResponse(
id=exchange.id,
2025-12-23 17:03:51 +01:00
public_id=str(exchange.public_id),
user_id=exchange.user_id,
user_email=email,
slot_start=exchange.slot_start,
slot_end=exchange.slot_end,
direction=exchange.direction.value,
bitcoin_transfer_method=exchange.bitcoin_transfer_method.value,
eur_amount=exchange.eur_amount,
sats_amount=exchange.sats_amount,
market_price_eur=exchange.market_price_eur,
agreed_price_eur=exchange.agreed_price_eur,
premium_percentage=exchange.premium_percentage,
status=exchange.status.value,
created_at=exchange.created_at,
cancelled_at=exchange.cancelled_at,
completed_at=exchange.completed_at,
)
# =============================================================================
# Price Endpoint
# =============================================================================
@router.get("/price", response_model=ExchangePriceResponse)
async def get_exchange_price(
db: AsyncSession = Depends(get_db),
_current_user: User = Depends(require_permission(Permission.CREATE_EXCHANGE)),
) -> ExchangePriceResponse:
"""
Get the current BTC/EUR price for trading.
Returns the latest price from the database. If no price exists or the price
is stale, attempts to fetch a fresh price from Bitfinex.
The response includes:
- market_price: The raw price from the exchange
- premium_percentage: The premium to apply to trades
- is_stale: Whether the price is older than 5 minutes
- config: Trading configuration (min/max EUR, increment)
"""
config = ExchangeConfigResponse(
eur_min=EUR_TRADE_MIN,
eur_max=EUR_TRADE_MAX,
eur_increment=EUR_TRADE_INCREMENT,
premium_percentage=PREMIUM_PERCENTAGE,
)
# Try to get the latest cached price
cached_price = await get_latest_price(db)
# If no cached price or it's stale, try to fetch a new one
if cached_price is None or is_price_stale(cached_price.timestamp):
try:
price_value, timestamp = await fetch_btc_eur_price()
# Store the new price
new_price = PriceHistory(
source=SOURCE_BITFINEX,
pair=PAIR_BTC_EUR,
price=price_value,
timestamp=timestamp,
)
db.add(new_price)
await db.commit()
await db.refresh(new_price)
return ExchangePriceResponse(
price=PriceResponse(
market_price=price_value,
premium_percentage=PREMIUM_PERCENTAGE,
timestamp=timestamp,
is_stale=False,
),
config=config,
)
except Exception as e:
# If fetch fails and we have a cached price, return it with stale flag
if cached_price is not None:
return ExchangePriceResponse(
price=PriceResponse(
market_price=cached_price.price,
premium_percentage=PREMIUM_PERCENTAGE,
timestamp=cached_price.timestamp,
is_stale=True,
),
config=config,
error=f"Failed to fetch fresh price: {e}",
)
# No cached price and fetch failed
return ExchangePriceResponse(
price=None,
config=config,
error=f"Price unavailable: {e}",
)
# Return the cached price (not stale)
return ExchangePriceResponse(
price=PriceResponse(
market_price=cached_price.price,
premium_percentage=PREMIUM_PERCENTAGE,
timestamp=cached_price.timestamp,
is_stale=is_price_stale(cached_price.timestamp),
),
config=config,
)
# =============================================================================
# Available Slots Endpoint
# =============================================================================
def _expand_availability_to_slots(
avail: Availability, slot_date: date, booked_starts: set[datetime]
) -> list[BookableSlot]:
"""
Expand an availability block into individual slots, filtering out booked ones.
"""
slots: list[BookableSlot] = []
# Start from the availability's start time
current_start = datetime.combine(slot_date, avail.start_time, tzinfo=UTC)
avail_end = datetime.combine(slot_date, avail.end_time, tzinfo=UTC)
while current_start + timedelta(minutes=SLOT_DURATION_MINUTES) <= avail_end:
slot_end = current_start + timedelta(minutes=SLOT_DURATION_MINUTES)
# Only include if not already booked
if current_start not in booked_starts:
slots.append(BookableSlot(start_time=current_start, end_time=slot_end))
current_start = slot_end
return slots
@router.get("/slots", response_model=AvailableSlotsResponse)
async def get_available_slots(
date_param: date = Query(..., alias="date"),
db: AsyncSession = Depends(get_db),
_current_user: User = Depends(require_permission(Permission.CREATE_EXCHANGE)),
) -> AvailableSlotsResponse:
"""
Get available booking slots for a specific date.
Returns all slots that:
- Fall within admin-defined availability windows
- Are not already booked by another user
"""
validate_date_in_range(date_param, context="book")
# Get availability for the date
result = await db.execute(
select(Availability).where(Availability.date == date_param)
)
availabilities = result.scalars().all()
if not availabilities:
return AvailableSlotsResponse(date=date_param, slots=[])
# Get already booked slots for the date
date_start = datetime.combine(date_param, time.min, tzinfo=UTC)
date_end = datetime.combine(date_param, time.max, tzinfo=UTC)
result = await db.execute(
select(Exchange.slot_start).where(
and_(
Exchange.slot_start >= date_start,
Exchange.slot_start <= date_end,
Exchange.status == ExchangeStatus.BOOKED,
)
)
)
booked_starts = {row[0] for row in result.all()}
# Expand each availability into slots
all_slots: list[BookableSlot] = []
for avail in availabilities:
slots = _expand_availability_to_slots(avail, date_param, booked_starts)
all_slots.extend(slots)
# Sort by start time
all_slots.sort(key=lambda s: s.start_time)
return AvailableSlotsResponse(date=date_param, slots=all_slots)
# =============================================================================
# Create Exchange Endpoint
# =============================================================================
@router.post("", response_model=ExchangeResponse)
async def create_exchange(
request: ExchangeRequest,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(require_permission(Permission.CREATE_EXCHANGE)),
) -> ExchangeResponse:
"""
Create a new exchange trade booking.
Validates:
- Slot is on a valid date and time boundary
- Slot is within admin availability
- Slot is not already booked
- Price is not stale
- EUR amount is within configured limits
"""
slot_date = request.slot_start.date()
validate_date_in_range(slot_date, context="book")
# Check if user already has a trade on this date
existing_trade_query = select(Exchange).where(
and_(
Exchange.user_id == current_user.id,
Exchange.slot_start >= datetime.combine(slot_date, time.min, tzinfo=UTC),
Exchange.slot_start
< datetime.combine(slot_date, time.max, tzinfo=UTC) + timedelta(days=1),
Exchange.status == ExchangeStatus.BOOKED,
)
)
existing_trade_result = await db.execute(existing_trade_query)
existing_trade = existing_trade_result.scalar_one_or_none()
if existing_trade:
raise HTTPException(
status_code=400,
detail=(
f"You already have a trade booked on {slot_date.strftime('%Y-%m-%d')}. "
2025-12-23 17:03:51 +01:00
f"Only one trade per day is allowed. "
f"Trade ID: {existing_trade.public_id}"
),
)
# Validate direction
try:
direction = TradeDirection(request.direction)
except ValueError:
raise HTTPException(
status_code=400,
detail=f"Invalid direction: {request.direction}. Must be 'buy' or 'sell'.",
) from None
# Validate bitcoin transfer method
try:
bitcoin_transfer_method = BitcoinTransferMethod(request.bitcoin_transfer_method)
except ValueError:
raise HTTPException(
status_code=400,
detail=(
f"Invalid bitcoin_transfer_method: {request.bitcoin_transfer_method}. "
"Must be 'onchain' or 'lightning'."
),
) from None
# Validate EUR amount
if request.eur_amount < EUR_TRADE_MIN * 100:
raise HTTPException(
status_code=400,
detail=f"EUR amount must be at least €{EUR_TRADE_MIN}",
)
if request.eur_amount > EUR_TRADE_MAX * 100:
raise HTTPException(
status_code=400,
detail=f"EUR amount must be at most €{EUR_TRADE_MAX}",
)
if request.eur_amount % (EUR_TRADE_INCREMENT * 100) != 0:
raise HTTPException(
status_code=400,
detail=f"EUR amount must be a multiple of €{EUR_TRADE_INCREMENT}",
)
# Validate Lightning threshold
if (
bitcoin_transfer_method == BitcoinTransferMethod.LIGHTNING
and request.eur_amount > LIGHTNING_MAX_EUR * 100
):
raise HTTPException(
status_code=400,
detail=(
f"Lightning payments are only allowed for amounts up to "
f"{LIGHTNING_MAX_EUR}. For amounts above €{LIGHTNING_MAX_EUR}, "
"please use onchain transactions."
),
)
# Validate slot timing - compute valid boundaries from slot duration
valid_minutes = tuple(range(0, 60, SLOT_DURATION_MINUTES))
if request.slot_start.minute not in valid_minutes:
raise HTTPException(
status_code=400,
detail=f"Slot must be on {SLOT_DURATION_MINUTES}-minute boundary",
)
if request.slot_start.second != 0 or request.slot_start.microsecond != 0:
raise HTTPException(
status_code=400,
detail="Slot start time must not have seconds or microseconds",
)
# Verify slot falls within availability
slot_start_time = request.slot_start.time()
slot_end_dt = request.slot_start + timedelta(minutes=SLOT_DURATION_MINUTES)
slot_end_time = slot_end_dt.time()
result = await db.execute(
select(Availability).where(
and_(
Availability.date == slot_date,
Availability.start_time <= slot_start_time,
Availability.end_time >= slot_end_time,
)
)
)
matching_availability = result.scalar_one_or_none()
if not matching_availability:
slot_str = request.slot_start.strftime("%Y-%m-%d %H:%M")
raise HTTPException(
status_code=400,
detail=f"Selected slot at {slot_str} UTC is not available",
)
# Get and validate price
cached_price = await get_latest_price(db)
if cached_price is None:
raise HTTPException(
status_code=503,
detail="Price data unavailable. Please try again later.",
)
if is_price_stale(cached_price.timestamp):
raise HTTPException(
status_code=503,
detail="Price is stale. Please refresh and try again.",
)
# Calculate agreed price based on direction
market_price = cached_price.price
agreed_price = apply_premium_for_direction(
market_price, PREMIUM_PERCENTAGE, direction
)
# Calculate sats amount based on agreed price
sats_amount = calculate_sats_amount(request.eur_amount, agreed_price)
2025-12-23 17:03:51 +01:00
# Check if slot is already booked (only consider BOOKED status, not cancelled)
slot_booked_query = select(Exchange).where(
and_(
Exchange.slot_start == request.slot_start,
Exchange.status == ExchangeStatus.BOOKED,
)
)
slot_booked_result = await db.execute(slot_booked_query)
slot_booked = slot_booked_result.scalar_one_or_none()
if slot_booked:
slot_str = request.slot_start.strftime("%Y-%m-%d %H:%M")
raise HTTPException(
status_code=409,
detail=(
f"This slot at {slot_str} UTC has already been booked. "
"Select another slot."
),
)
# Create the exchange
exchange = Exchange(
user_id=current_user.id,
slot_start=request.slot_start,
slot_end=slot_end_dt,
direction=direction,
bitcoin_transfer_method=bitcoin_transfer_method,
eur_amount=request.eur_amount,
sats_amount=sats_amount,
market_price_eur=market_price,
agreed_price_eur=agreed_price,
premium_percentage=PREMIUM_PERCENTAGE,
status=ExchangeStatus.BOOKED,
)
db.add(exchange)
try:
await db.commit()
await db.refresh(exchange)
2025-12-23 17:03:51 +01:00
except IntegrityError as e:
await db.rollback()
2025-12-23 17:03:51 +01:00
# This should rarely happen now since we check explicitly above,
# but keep it for other potential integrity violations
raise HTTPException(
status_code=409,
2025-12-23 17:03:51 +01:00
detail="Database constraint violation. Please try again.",
) from e
return _to_exchange_response(exchange, current_user.email)
# =============================================================================
# User's Exchanges Endpoints
# =============================================================================
trades_router = APIRouter(prefix="/api/trades", tags=["trades"])
@trades_router.get("", response_model=list[ExchangeResponse])
async def get_my_trades(
db: AsyncSession = Depends(get_db),
current_user: User = Depends(require_permission(Permission.VIEW_OWN_EXCHANGES)),
) -> list[ExchangeResponse]:
"""Get the current user's exchanges, sorted by date (newest first)."""
result = await db.execute(
select(Exchange)
.where(Exchange.user_id == current_user.id)
.order_by(Exchange.slot_start.desc())
)
exchanges = result.scalars().all()
return [_to_exchange_response(ex, current_user.email) for ex in exchanges]
2025-12-23 17:03:51 +01:00
@trades_router.get("/{public_id}", response_model=ExchangeResponse)
async def get_my_trade(
2025-12-23 17:03:51 +01:00
public_id: uuid.UUID,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(require_permission(Permission.VIEW_OWN_EXCHANGES)),
) -> ExchangeResponse:
2025-12-23 17:03:51 +01:00
"""Get a specific trade by public ID. User can only access their own trades."""
result = await db.execute(
select(Exchange).where(
2025-12-23 17:03:51 +01:00
and_(Exchange.public_id == public_id, Exchange.user_id == current_user.id)
)
)
exchange = result.scalar_one_or_none()
if not exchange:
raise HTTPException(
status_code=404,
detail="Trade not found or you don't have permission to view it.",
)
return _to_exchange_response(exchange, current_user.email)
2025-12-23 17:03:51 +01:00
@trades_router.post("/{public_id}/cancel", response_model=ExchangeResponse)
async def cancel_my_trade(
2025-12-23 17:03:51 +01:00
public_id: uuid.UUID,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(require_permission(Permission.CANCEL_OWN_EXCHANGE)),
) -> ExchangeResponse:
"""Cancel one of the current user's exchanges."""
# Get the exchange with eager loading of user relationship
result = await db.execute(
select(Exchange)
.options(joinedload(Exchange.user))
2025-12-23 17:03:51 +01:00
.where(Exchange.public_id == public_id)
)
exchange = result.scalar_one_or_none()
if not exchange:
raise HTTPException(
status_code=404,
2025-12-23 17:03:51 +01:00
detail="Trade not found",
)
# Verify ownership
if exchange.user_id != current_user.id:
raise HTTPException(
status_code=403,
detail="Cannot cancel another user's trade",
)
# Check if already in a final state
if exchange.status != ExchangeStatus.BOOKED:
raise HTTPException(
status_code=400,
detail=f"Cannot cancel: status is '{exchange.status.value}'",
)
# Check if slot time has already passed
if exchange.slot_start <= datetime.now(UTC):
raise HTTPException(
status_code=400,
detail="Cannot cancel: trade slot time has already passed",
)
exchange.status = ExchangeStatus.CANCELLED_BY_USER
exchange.cancelled_at = datetime.now(UTC)
await db.commit()
await db.refresh(exchange)
return _to_exchange_response(exchange, current_user.email)
# =============================================================================
# Admin Exchanges Endpoints
# =============================================================================
admin_trades_router = APIRouter(prefix="/api/admin/trades", tags=["admin-trades"])
def _to_admin_exchange_response(exchange: Exchange) -> AdminExchangeResponse:
"""Convert an Exchange model to AdminExchangeResponse with user contact."""
user = exchange.user
return AdminExchangeResponse(
id=exchange.id,
2025-12-23 17:03:51 +01:00
public_id=str(exchange.public_id),
user_id=exchange.user_id,
user_email=user.email,
user_contact=ExchangeUserContact(
email=user.email,
contact_email=user.contact_email,
telegram=user.telegram,
signal=user.signal,
nostr_npub=user.nostr_npub,
),
slot_start=exchange.slot_start,
slot_end=exchange.slot_end,
direction=exchange.direction.value,
bitcoin_transfer_method=exchange.bitcoin_transfer_method.value,
eur_amount=exchange.eur_amount,
sats_amount=exchange.sats_amount,
market_price_eur=exchange.market_price_eur,
agreed_price_eur=exchange.agreed_price_eur,
premium_percentage=exchange.premium_percentage,
status=exchange.status.value,
created_at=exchange.created_at,
cancelled_at=exchange.cancelled_at,
completed_at=exchange.completed_at,
)
@admin_trades_router.get("/upcoming", response_model=list[AdminExchangeResponse])
async def get_upcoming_trades(
db: AsyncSession = Depends(get_db),
_current_user: User = Depends(require_permission(Permission.VIEW_ALL_EXCHANGES)),
) -> list[AdminExchangeResponse]:
"""Get all upcoming booked trades, sorted by slot time ascending."""
now = datetime.now(UTC)
result = await db.execute(
select(Exchange)
.options(joinedload(Exchange.user))
.where(
and_(
Exchange.slot_start > now,
Exchange.status == ExchangeStatus.BOOKED,
)
)
.order_by(Exchange.slot_start.asc())
)
exchanges = result.scalars().all()
return [_to_admin_exchange_response(ex) for ex in exchanges]
@admin_trades_router.get("/past", response_model=list[AdminExchangeResponse])
async def get_past_trades(
status: str | None = None,
start_date: date | None = None,
end_date: date | None = None,
user_search: str | None = None,
db: AsyncSession = Depends(get_db),
_current_user: User = Depends(require_permission(Permission.VIEW_ALL_EXCHANGES)),
) -> list[AdminExchangeResponse]:
"""
Get past trades with optional filters.
Filters:
- status: Filter by exchange status
- start_date, end_date: Filter by slot_start date range
- user_search: Search by user email (partial match)
"""
now = datetime.now(UTC)
# Start with base query for past trades (slot_start <= now OR not booked)
query = (
select(Exchange)
.options(joinedload(Exchange.user))
.where(
(Exchange.slot_start <= now) | (Exchange.status != ExchangeStatus.BOOKED)
)
)
# Apply status filter
if status:
try:
status_enum = ExchangeStatus(status)
query = query.where(Exchange.status == status_enum)
except ValueError:
raise HTTPException(
status_code=400,
detail=f"Invalid status: {status}",
) from None
# Apply date range filter
if start_date:
start_dt = datetime.combine(start_date, time.min, tzinfo=UTC)
query = query.where(Exchange.slot_start >= start_dt)
if end_date:
end_dt = datetime.combine(end_date, time.max, tzinfo=UTC)
query = query.where(Exchange.slot_start <= end_dt)
# Apply user search filter (join with User table)
if user_search:
query = query.join(Exchange.user).where(User.email.ilike(f"%{user_search}%"))
# Order by most recent first
query = query.order_by(Exchange.slot_start.desc())
result = await db.execute(query)
exchanges = result.scalars().all()
return [_to_admin_exchange_response(ex) for ex in exchanges]
2025-12-23 17:03:51 +01:00
@admin_trades_router.post("/{public_id}/complete", response_model=AdminExchangeResponse)
async def complete_trade(
2025-12-23 17:03:51 +01:00
public_id: uuid.UUID,
db: AsyncSession = Depends(get_db),
_current_user: User = Depends(require_permission(Permission.COMPLETE_EXCHANGE)),
) -> AdminExchangeResponse:
"""Mark a trade as completed. Only possible after slot time has passed."""
result = await db.execute(
select(Exchange)
.options(joinedload(Exchange.user))
2025-12-23 17:03:51 +01:00
.where(Exchange.public_id == public_id)
)
exchange = result.scalar_one_or_none()
if not exchange:
raise HTTPException(
status_code=404,
2025-12-23 17:03:51 +01:00
detail="Trade not found",
)
# Check slot has passed
if exchange.slot_start > datetime.now(UTC):
raise HTTPException(
status_code=400,
detail="Cannot complete: trade slot has not yet started",
)
# Check status is BOOKED
if exchange.status != ExchangeStatus.BOOKED:
raise HTTPException(
status_code=400,
detail=f"Cannot complete: status is '{exchange.status.value}'",
)
exchange.status = ExchangeStatus.COMPLETED
exchange.completed_at = datetime.now(UTC)
await db.commit()
await db.refresh(exchange)
return _to_admin_exchange_response(exchange)
2025-12-23 17:03:51 +01:00
@admin_trades_router.post("/{public_id}/no-show", response_model=AdminExchangeResponse)
async def mark_no_show(
2025-12-23 17:03:51 +01:00
public_id: uuid.UUID,
db: AsyncSession = Depends(get_db),
_current_user: User = Depends(require_permission(Permission.COMPLETE_EXCHANGE)),
) -> AdminExchangeResponse:
"""Mark a trade as no-show. Only possible after slot time has passed."""
result = await db.execute(
select(Exchange)
.options(joinedload(Exchange.user))
2025-12-23 17:03:51 +01:00
.where(Exchange.public_id == public_id)
)
exchange = result.scalar_one_or_none()
if not exchange:
raise HTTPException(
status_code=404,
2025-12-23 17:03:51 +01:00
detail="Trade not found",
)
# Check slot has passed
if exchange.slot_start > datetime.now(UTC):
raise HTTPException(
status_code=400,
detail="Cannot mark as no-show: trade slot has not yet started",
)
# Check status is BOOKED
if exchange.status != ExchangeStatus.BOOKED:
raise HTTPException(
status_code=400,
detail=f"Cannot mark as no-show: status is '{exchange.status.value}'",
)
exchange.status = ExchangeStatus.NO_SHOW
exchange.completed_at = datetime.now(UTC)
await db.commit()
await db.refresh(exchange)
return _to_admin_exchange_response(exchange)
2025-12-23 17:03:51 +01:00
@admin_trades_router.post("/{public_id}/cancel", response_model=AdminExchangeResponse)
async def admin_cancel_trade(
2025-12-23 17:03:51 +01:00
public_id: uuid.UUID,
db: AsyncSession = Depends(get_db),
_current_user: User = Depends(require_permission(Permission.CANCEL_ANY_EXCHANGE)),
) -> AdminExchangeResponse:
"""Cancel any trade (admin only)."""
result = await db.execute(
select(Exchange)
.options(joinedload(Exchange.user))
2025-12-23 17:03:51 +01:00
.where(Exchange.public_id == public_id)
)
exchange = result.scalar_one_or_none()
if not exchange:
raise HTTPException(
status_code=404,
2025-12-23 17:03:51 +01:00
detail="Trade not found",
)
# Check status is BOOKED
if exchange.status != ExchangeStatus.BOOKED:
raise HTTPException(
status_code=400,
detail=f"Cannot cancel: status is '{exchange.status.value}'",
)
exchange.status = ExchangeStatus.CANCELLED_BY_ADMIN
exchange.cancelled_at = datetime.now(UTC)
await db.commit()
await db.refresh(exchange)
return _to_admin_exchange_response(exchange)
# =============================================================================
# Admin User Search Endpoint
# =============================================================================
admin_users_router = APIRouter(prefix="/api/admin/users", tags=["admin-users"])
class UserSearchResult(BaseModel):
"""Result item for user search."""
id: int
email: str
@admin_users_router.get("/search", response_model=list[UserSearchResult])
async def search_users(
q: str = Query(..., min_length=1, description="Search query for user email"),
db: AsyncSession = Depends(get_db),
_current_user: User = Depends(require_permission(Permission.VIEW_ALL_EXCHANGES)),
) -> list[UserSearchResult]:
"""
Search users by email for autocomplete.
Returns users whose email contains the search query (case-insensitive).
Limited to 10 results for autocomplete purposes.
"""
result = await db.execute(
select(User).where(User.email.ilike(f"%{q}%")).order_by(User.email).limit(10)
)
users = result.scalars().all()
return [UserSearchResult(id=u.id, email=u.email) for u in users]
# All routers from this module for easy registration
routers = [router, trades_router, admin_trades_router, admin_users_router]