arbret/backend/routes/audit.py

214 lines
6.3 KiB
Python
Raw Normal View History

2025-12-20 22:18:14 +01:00
"""Audit routes for viewing action records."""
from collections.abc import Callable
from typing import TypeVar
2025-12-20 22:18:14 +01:00
from fastapi import APIRouter, Depends, Query
from pydantic import BaseModel
from sqlalchemy import desc, func, select
2025-12-20 22:18:14 +01:00
from sqlalchemy.ext.asyncio import AsyncSession
from auth import require_permission
from database import get_db
from models import (
CounterRecord,
Permission,
PriceHistory,
RandomNumberOutcome,
SumRecord,
User,
)
from pagination import (
calculate_offset,
calculate_total_pages,
create_paginated_response,
)
from price_fetcher import fetch_btc_eur_price
2025-12-20 22:18:14 +01:00
from schemas import (
CounterRecordResponse,
PaginatedCounterRecords,
PaginatedSumRecords,
PriceHistoryResponse,
RandomNumberOutcomeResponse,
SumRecordResponse,
2025-12-20 22:18:14 +01:00
)
router = APIRouter(prefix="/api/audit", tags=["audit"])
R = TypeVar("R", bound=BaseModel)
async def paginate_with_user_email(
db: AsyncSession,
model: type[SumRecord] | type[CounterRecord],
page: int,
per_page: int,
row_mapper: Callable[..., R],
) -> tuple[list[R], int, int]:
"""
Generic pagination helper for audit records that need user email.
Returns: (records, total, total_pages)
"""
# Get total count
count_result = await db.execute(select(func.count(model.id)))
total = count_result.scalar() or 0
# Get paginated records with user email
offset = calculate_offset(page, per_page)
2025-12-20 22:18:14 +01:00
query = (
select(model, User.email)
.join(User, model.user_id == User.id)
.order_by(desc(model.created_at))
.offset(offset)
.limit(per_page)
)
result = await db.execute(query)
rows = result.all()
records: list[R] = [row_mapper(record, email) for record, email in rows]
return records, total, calculate_total_pages(total, per_page)
2025-12-20 22:18:14 +01:00
def _to_counter_record_response(
record: CounterRecord, email: str
) -> CounterRecordResponse:
2025-12-20 22:18:14 +01:00
return CounterRecordResponse(
id=record.id,
user_email=email,
value_before=record.value_before,
value_after=record.value_after,
created_at=record.created_at,
)
def _to_sum_record_response(record: SumRecord, email: str) -> SumRecordResponse:
2025-12-20 22:18:14 +01:00
return SumRecordResponse(
id=record.id,
user_email=email,
a=record.a,
b=record.b,
result=record.result,
created_at=record.created_at,
)
@router.get("/counter", response_model=PaginatedCounterRecords)
async def get_counter_records(
page: int = Query(1, ge=1),
per_page: int = Query(10, ge=1, le=100),
db: AsyncSession = Depends(get_db),
_current_user: User = Depends(require_permission(Permission.VIEW_AUDIT)),
) -> PaginatedCounterRecords:
"""Get paginated counter action records."""
records, total, _ = await paginate_with_user_email(
db, CounterRecord, page, per_page, _to_counter_record_response
2025-12-20 22:18:14 +01:00
)
return create_paginated_response(records, total, page, per_page)
2025-12-20 22:18:14 +01:00
@router.get("/sum", response_model=PaginatedSumRecords)
async def get_sum_records(
page: int = Query(1, ge=1),
per_page: int = Query(10, ge=1, le=100),
db: AsyncSession = Depends(get_db),
_current_user: User = Depends(require_permission(Permission.VIEW_AUDIT)),
) -> PaginatedSumRecords:
"""Get paginated sum action records."""
records, total, _ = await paginate_with_user_email(
db, SumRecord, page, per_page, _to_sum_record_response
2025-12-20 22:18:14 +01:00
)
return create_paginated_response(records, total, page, per_page)
@router.get("/random-jobs", response_model=list[RandomNumberOutcomeResponse])
async def get_random_job_outcomes(
db: AsyncSession = Depends(get_db),
_current_user: User = Depends(require_permission(Permission.VIEW_AUDIT)),
) -> list[RandomNumberOutcomeResponse]:
"""Get all random number job outcomes, newest first."""
# Explicit join to avoid N+1 query
query = (
select(RandomNumberOutcome, User.email)
.join(User, RandomNumberOutcome.triggered_by_user_id == User.id)
.order_by(desc(RandomNumberOutcome.created_at))
)
result = await db.execute(query)
rows = result.all()
return [
RandomNumberOutcomeResponse(
id=outcome.id,
job_id=outcome.job_id,
triggered_by_user_id=outcome.triggered_by_user_id,
triggered_by_email=email,
value=outcome.value,
duration_ms=outcome.duration_ms,
status=outcome.status,
created_at=outcome.created_at,
)
for outcome, email in rows
]
# =============================================================================
# Price History Endpoints
# =============================================================================
PRICE_HISTORY_LIMIT = 20
@router.get("/price-history", response_model=list[PriceHistoryResponse])
async def get_price_history(
db: AsyncSession = Depends(get_db),
_current_user: User = Depends(require_permission(Permission.VIEW_AUDIT)),
) -> list[PriceHistoryResponse]:
"""Get the 20 most recent price history records."""
query = (
select(PriceHistory)
.order_by(desc(PriceHistory.timestamp))
.limit(PRICE_HISTORY_LIMIT)
)
result = await db.execute(query)
records = result.scalars().all()
return [
PriceHistoryResponse(
id=record.id,
source=record.source,
pair=record.pair,
price=record.price,
timestamp=record.timestamp,
created_at=record.created_at,
)
for record in records
]
@router.post("/price-history/fetch", response_model=PriceHistoryResponse)
async def fetch_price_now(
db: AsyncSession = Depends(get_db),
_current_user: User = Depends(require_permission(Permission.VIEW_AUDIT)),
) -> PriceHistoryResponse:
"""Manually trigger a price fetch from Bitfinex."""
price, timestamp = await fetch_btc_eur_price()
record = PriceHistory(
source="bitfinex",
pair="BTC/EUR",
price=price,
timestamp=timestamp,
)
db.add(record)
await db.commit()
await db.refresh(record)
return PriceHistoryResponse(
id=record.id,
source=record.source,
pair=record.pair,
price=record.price,
timestamp=record.timestamp,
created_at=record.created_at,
)