Add SOURCE_BITFINEX and PAIR_BTC_EUR constants in price_fetcher.py and use them consistently in routes/audit.py, worker.py, and tests.
213 lines
6.3 KiB
Python
213 lines
6.3 KiB
Python
"""Audit routes for viewing action records."""
|
|
|
|
from collections.abc import Callable
|
|
from typing import TypeVar
|
|
|
|
from fastapi import APIRouter, Depends, Query
|
|
from pydantic import BaseModel
|
|
from sqlalchemy import desc, func, select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from auth import require_permission
|
|
from database import get_db
|
|
from models import (
|
|
CounterRecord,
|
|
Permission,
|
|
PriceHistory,
|
|
RandomNumberOutcome,
|
|
SumRecord,
|
|
User,
|
|
)
|
|
from pagination import (
|
|
calculate_offset,
|
|
calculate_total_pages,
|
|
create_paginated_response,
|
|
)
|
|
from price_fetcher import PAIR_BTC_EUR, SOURCE_BITFINEX, fetch_btc_eur_price
|
|
from schemas import (
|
|
CounterRecordResponse,
|
|
PaginatedCounterRecords,
|
|
PaginatedSumRecords,
|
|
PriceHistoryResponse,
|
|
RandomNumberOutcomeResponse,
|
|
SumRecordResponse,
|
|
)
|
|
|
|
router = APIRouter(prefix="/api/audit", tags=["audit"])
|
|
|
|
R = TypeVar("R", bound=BaseModel)
|
|
|
|
|
|
async def paginate_with_user_email(
|
|
db: AsyncSession,
|
|
model: type[SumRecord] | type[CounterRecord],
|
|
page: int,
|
|
per_page: int,
|
|
row_mapper: Callable[..., R],
|
|
) -> tuple[list[R], int, int]:
|
|
"""
|
|
Generic pagination helper for audit records that need user email.
|
|
|
|
Returns: (records, total, total_pages)
|
|
"""
|
|
# Get total count
|
|
count_result = await db.execute(select(func.count(model.id)))
|
|
total = count_result.scalar() or 0
|
|
|
|
# Get paginated records with user email
|
|
offset = calculate_offset(page, per_page)
|
|
query = (
|
|
select(model, User.email)
|
|
.join(User, model.user_id == User.id)
|
|
.order_by(desc(model.created_at))
|
|
.offset(offset)
|
|
.limit(per_page)
|
|
)
|
|
result = await db.execute(query)
|
|
rows = result.all()
|
|
|
|
records: list[R] = [row_mapper(record, email) for record, email in rows]
|
|
return records, total, calculate_total_pages(total, per_page)
|
|
|
|
|
|
def _to_counter_record_response(
|
|
record: CounterRecord, email: str
|
|
) -> CounterRecordResponse:
|
|
return CounterRecordResponse(
|
|
id=record.id,
|
|
user_email=email,
|
|
value_before=record.value_before,
|
|
value_after=record.value_after,
|
|
created_at=record.created_at,
|
|
)
|
|
|
|
|
|
def _to_sum_record_response(record: SumRecord, email: str) -> SumRecordResponse:
|
|
return SumRecordResponse(
|
|
id=record.id,
|
|
user_email=email,
|
|
a=record.a,
|
|
b=record.b,
|
|
result=record.result,
|
|
created_at=record.created_at,
|
|
)
|
|
|
|
|
|
@router.get("/counter", response_model=PaginatedCounterRecords)
|
|
async def get_counter_records(
|
|
page: int = Query(1, ge=1),
|
|
per_page: int = Query(10, ge=1, le=100),
|
|
db: AsyncSession = Depends(get_db),
|
|
_current_user: User = Depends(require_permission(Permission.VIEW_AUDIT)),
|
|
) -> PaginatedCounterRecords:
|
|
"""Get paginated counter action records."""
|
|
records, total, _ = await paginate_with_user_email(
|
|
db, CounterRecord, page, per_page, _to_counter_record_response
|
|
)
|
|
return create_paginated_response(records, total, page, per_page)
|
|
|
|
|
|
@router.get("/sum", response_model=PaginatedSumRecords)
|
|
async def get_sum_records(
|
|
page: int = Query(1, ge=1),
|
|
per_page: int = Query(10, ge=1, le=100),
|
|
db: AsyncSession = Depends(get_db),
|
|
_current_user: User = Depends(require_permission(Permission.VIEW_AUDIT)),
|
|
) -> PaginatedSumRecords:
|
|
"""Get paginated sum action records."""
|
|
records, total, _ = await paginate_with_user_email(
|
|
db, SumRecord, page, per_page, _to_sum_record_response
|
|
)
|
|
return create_paginated_response(records, total, page, per_page)
|
|
|
|
|
|
@router.get("/random-jobs", response_model=list[RandomNumberOutcomeResponse])
|
|
async def get_random_job_outcomes(
|
|
db: AsyncSession = Depends(get_db),
|
|
_current_user: User = Depends(require_permission(Permission.VIEW_AUDIT)),
|
|
) -> list[RandomNumberOutcomeResponse]:
|
|
"""Get all random number job outcomes, newest first."""
|
|
# Explicit join to avoid N+1 query
|
|
query = (
|
|
select(RandomNumberOutcome, User.email)
|
|
.join(User, RandomNumberOutcome.triggered_by_user_id == User.id)
|
|
.order_by(desc(RandomNumberOutcome.created_at))
|
|
)
|
|
result = await db.execute(query)
|
|
rows = result.all()
|
|
|
|
return [
|
|
RandomNumberOutcomeResponse(
|
|
id=outcome.id,
|
|
job_id=outcome.job_id,
|
|
triggered_by_user_id=outcome.triggered_by_user_id,
|
|
triggered_by_email=email,
|
|
value=outcome.value,
|
|
duration_ms=outcome.duration_ms,
|
|
status=outcome.status,
|
|
created_at=outcome.created_at,
|
|
)
|
|
for outcome, email in rows
|
|
]
|
|
|
|
|
|
# =============================================================================
|
|
# Price History Endpoints
|
|
# =============================================================================
|
|
|
|
PRICE_HISTORY_LIMIT = 20
|
|
|
|
|
|
@router.get("/price-history", response_model=list[PriceHistoryResponse])
|
|
async def get_price_history(
|
|
db: AsyncSession = Depends(get_db),
|
|
_current_user: User = Depends(require_permission(Permission.VIEW_AUDIT)),
|
|
) -> list[PriceHistoryResponse]:
|
|
"""Get the 20 most recent price history records."""
|
|
query = (
|
|
select(PriceHistory)
|
|
.order_by(desc(PriceHistory.timestamp))
|
|
.limit(PRICE_HISTORY_LIMIT)
|
|
)
|
|
result = await db.execute(query)
|
|
records = result.scalars().all()
|
|
|
|
return [
|
|
PriceHistoryResponse(
|
|
id=record.id,
|
|
source=record.source,
|
|
pair=record.pair,
|
|
price=record.price,
|
|
timestamp=record.timestamp,
|
|
created_at=record.created_at,
|
|
)
|
|
for record in records
|
|
]
|
|
|
|
|
|
@router.post("/price-history/fetch", response_model=PriceHistoryResponse)
|
|
async def fetch_price_now(
|
|
db: AsyncSession = Depends(get_db),
|
|
_current_user: User = Depends(require_permission(Permission.VIEW_AUDIT)),
|
|
) -> PriceHistoryResponse:
|
|
"""Manually trigger a price fetch from Bitfinex."""
|
|
price, timestamp = await fetch_btc_eur_price()
|
|
|
|
record = PriceHistory(
|
|
source=SOURCE_BITFINEX,
|
|
pair=PAIR_BTC_EUR,
|
|
price=price,
|
|
timestamp=timestamp,
|
|
)
|
|
db.add(record)
|
|
await db.commit()
|
|
await db.refresh(record)
|
|
|
|
return PriceHistoryResponse(
|
|
id=record.id,
|
|
source=record.source,
|
|
pair=record.pair,
|
|
price=record.price,
|
|
timestamp=record.timestamp,
|
|
created_at=record.created_at,
|
|
)
|