When a duplicate timestamp occurs (rare but possible), return the existing record instead of failing with a 500 error. This matches the worker's ON CONFLICT DO NOTHING behavior. Added test for duplicate timestamp handling.
226 lines
6.7 KiB
Python
226 lines
6.7 KiB
Python
"""Audit routes for viewing action records."""
|
|
|
|
from collections.abc import Callable
|
|
from typing import TypeVar
|
|
|
|
from fastapi import APIRouter, Depends, Query
|
|
from pydantic import BaseModel
|
|
from sqlalchemy import desc, func, select
|
|
from sqlalchemy.exc import IntegrityError
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from auth import require_permission
|
|
from database import get_db
|
|
from models import (
|
|
CounterRecord,
|
|
Permission,
|
|
PriceHistory,
|
|
RandomNumberOutcome,
|
|
SumRecord,
|
|
User,
|
|
)
|
|
from pagination import (
|
|
calculate_offset,
|
|
calculate_total_pages,
|
|
create_paginated_response,
|
|
)
|
|
from price_fetcher import PAIR_BTC_EUR, SOURCE_BITFINEX, fetch_btc_eur_price
|
|
from schemas import (
|
|
CounterRecordResponse,
|
|
PaginatedCounterRecords,
|
|
PaginatedSumRecords,
|
|
PriceHistoryResponse,
|
|
RandomNumberOutcomeResponse,
|
|
SumRecordResponse,
|
|
)
|
|
|
|
router = APIRouter(prefix="/api/audit", tags=["audit"])
|
|
|
|
R = TypeVar("R", bound=BaseModel)
|
|
|
|
|
|
async def paginate_with_user_email(
|
|
db: AsyncSession,
|
|
model: type[SumRecord] | type[CounterRecord],
|
|
page: int,
|
|
per_page: int,
|
|
row_mapper: Callable[..., R],
|
|
) -> tuple[list[R], int, int]:
|
|
"""
|
|
Generic pagination helper for audit records that need user email.
|
|
|
|
Returns: (records, total, total_pages)
|
|
"""
|
|
# Get total count
|
|
count_result = await db.execute(select(func.count(model.id)))
|
|
total = count_result.scalar() or 0
|
|
|
|
# Get paginated records with user email
|
|
offset = calculate_offset(page, per_page)
|
|
query = (
|
|
select(model, User.email)
|
|
.join(User, model.user_id == User.id)
|
|
.order_by(desc(model.created_at))
|
|
.offset(offset)
|
|
.limit(per_page)
|
|
)
|
|
result = await db.execute(query)
|
|
rows = result.all()
|
|
|
|
records: list[R] = [row_mapper(record, email) for record, email in rows]
|
|
return records, total, calculate_total_pages(total, per_page)
|
|
|
|
|
|
def _to_counter_record_response(
|
|
record: CounterRecord, email: str
|
|
) -> CounterRecordResponse:
|
|
return CounterRecordResponse(
|
|
id=record.id,
|
|
user_email=email,
|
|
value_before=record.value_before,
|
|
value_after=record.value_after,
|
|
created_at=record.created_at,
|
|
)
|
|
|
|
|
|
def _to_sum_record_response(record: SumRecord, email: str) -> SumRecordResponse:
|
|
return SumRecordResponse(
|
|
id=record.id,
|
|
user_email=email,
|
|
a=record.a,
|
|
b=record.b,
|
|
result=record.result,
|
|
created_at=record.created_at,
|
|
)
|
|
|
|
|
|
@router.get("/counter", response_model=PaginatedCounterRecords)
|
|
async def get_counter_records(
|
|
page: int = Query(1, ge=1),
|
|
per_page: int = Query(10, ge=1, le=100),
|
|
db: AsyncSession = Depends(get_db),
|
|
_current_user: User = Depends(require_permission(Permission.VIEW_AUDIT)),
|
|
) -> PaginatedCounterRecords:
|
|
"""Get paginated counter action records."""
|
|
records, total, _ = await paginate_with_user_email(
|
|
db, CounterRecord, page, per_page, _to_counter_record_response
|
|
)
|
|
return create_paginated_response(records, total, page, per_page)
|
|
|
|
|
|
@router.get("/sum", response_model=PaginatedSumRecords)
|
|
async def get_sum_records(
|
|
page: int = Query(1, ge=1),
|
|
per_page: int = Query(10, ge=1, le=100),
|
|
db: AsyncSession = Depends(get_db),
|
|
_current_user: User = Depends(require_permission(Permission.VIEW_AUDIT)),
|
|
) -> PaginatedSumRecords:
|
|
"""Get paginated sum action records."""
|
|
records, total, _ = await paginate_with_user_email(
|
|
db, SumRecord, page, per_page, _to_sum_record_response
|
|
)
|
|
return create_paginated_response(records, total, page, per_page)
|
|
|
|
|
|
@router.get("/random-jobs", response_model=list[RandomNumberOutcomeResponse])
|
|
async def get_random_job_outcomes(
|
|
db: AsyncSession = Depends(get_db),
|
|
_current_user: User = Depends(require_permission(Permission.VIEW_AUDIT)),
|
|
) -> list[RandomNumberOutcomeResponse]:
|
|
"""Get all random number job outcomes, newest first."""
|
|
# Explicit join to avoid N+1 query
|
|
query = (
|
|
select(RandomNumberOutcome, User.email)
|
|
.join(User, RandomNumberOutcome.triggered_by_user_id == User.id)
|
|
.order_by(desc(RandomNumberOutcome.created_at))
|
|
)
|
|
result = await db.execute(query)
|
|
rows = result.all()
|
|
|
|
return [
|
|
RandomNumberOutcomeResponse(
|
|
id=outcome.id,
|
|
job_id=outcome.job_id,
|
|
triggered_by_user_id=outcome.triggered_by_user_id,
|
|
triggered_by_email=email,
|
|
value=outcome.value,
|
|
duration_ms=outcome.duration_ms,
|
|
status=outcome.status,
|
|
created_at=outcome.created_at,
|
|
)
|
|
for outcome, email in rows
|
|
]
|
|
|
|
|
|
# =============================================================================
|
|
# Price History Endpoints
|
|
# =============================================================================
|
|
|
|
PRICE_HISTORY_LIMIT = 20
|
|
|
|
|
|
@router.get("/price-history", response_model=list[PriceHistoryResponse])
|
|
async def get_price_history(
|
|
db: AsyncSession = Depends(get_db),
|
|
_current_user: User = Depends(require_permission(Permission.VIEW_AUDIT)),
|
|
) -> list[PriceHistoryResponse]:
|
|
"""Get the 20 most recent price history records."""
|
|
query = (
|
|
select(PriceHistory)
|
|
.order_by(desc(PriceHistory.timestamp))
|
|
.limit(PRICE_HISTORY_LIMIT)
|
|
)
|
|
result = await db.execute(query)
|
|
records = result.scalars().all()
|
|
|
|
return [
|
|
PriceHistoryResponse(
|
|
id=record.id,
|
|
source=record.source,
|
|
pair=record.pair,
|
|
price=record.price,
|
|
timestamp=record.timestamp,
|
|
created_at=record.created_at,
|
|
)
|
|
for record in records
|
|
]
|
|
|
|
|
|
@router.post("/price-history/fetch", response_model=PriceHistoryResponse)
|
|
async def fetch_price_now(
|
|
db: AsyncSession = Depends(get_db),
|
|
_current_user: User = Depends(require_permission(Permission.VIEW_AUDIT)),
|
|
) -> PriceHistoryResponse:
|
|
"""Manually trigger a price fetch from Bitfinex."""
|
|
price, timestamp = await fetch_btc_eur_price()
|
|
|
|
record = PriceHistory(
|
|
source=SOURCE_BITFINEX,
|
|
pair=PAIR_BTC_EUR,
|
|
price=price,
|
|
timestamp=timestamp,
|
|
)
|
|
db.add(record)
|
|
|
|
try:
|
|
await db.commit()
|
|
await db.refresh(record)
|
|
except IntegrityError:
|
|
# Duplicate timestamp - return the existing record
|
|
await db.rollback()
|
|
query = select(PriceHistory).where(
|
|
PriceHistory.source == SOURCE_BITFINEX,
|
|
PriceHistory.pair == PAIR_BTC_EUR,
|
|
PriceHistory.timestamp == timestamp,
|
|
)
|
|
result = await db.execute(query)
|
|
record = result.scalar_one()
|
|
|
|
return PriceHistoryResponse(
|
|
id=record.id,
|
|
source=record.source,
|
|
pair=record.pair,
|
|
price=record.price,
|
|
timestamp=record.timestamp,
|
|
created_at=record.created_at,
|
|
)
|