arbret/backend/routes/audit.py
counterweight e7e3c97102
refactor(backend): standardize model-to-response conversion naming
Issue #8: Inconsistent naming for model-to-response conversion functions.

Changes:
- Rename build_invite_response to _to_invite_response (invites.py)
- Rename _map_counter_record to _to_counter_record_response (audit.py)
- Rename _map_sum_record to _to_sum_record_response (audit.py)

All conversion functions now follow the _to_X_response pattern,
using underscore prefix for module-private functions.
2025-12-22 09:16:05 +01:00

142 lines
4.4 KiB
Python

"""Audit routes for viewing action records."""
from collections.abc import Callable
from typing import TypeVar
from fastapi import APIRouter, Depends, Query
from pydantic import BaseModel
from sqlalchemy import desc, func, select
from sqlalchemy.ext.asyncio import AsyncSession
from auth import require_permission
from database import get_db
from models import CounterRecord, Permission, RandomNumberOutcome, SumRecord, User
from pagination import (
calculate_offset,
calculate_total_pages,
create_paginated_response,
)
from schemas import (
CounterRecordResponse,
PaginatedCounterRecords,
PaginatedSumRecords,
RandomNumberOutcomeResponse,
SumRecordResponse,
)
router = APIRouter(prefix="/api/audit", tags=["audit"])
R = TypeVar("R", bound=BaseModel)
async def paginate_with_user_email(
db: AsyncSession,
model: type[SumRecord] | type[CounterRecord],
page: int,
per_page: int,
row_mapper: Callable[..., R],
) -> tuple[list[R], int, int]:
"""
Generic pagination helper for audit records that need user email.
Returns: (records, total, total_pages)
"""
# Get total count
count_result = await db.execute(select(func.count(model.id)))
total = count_result.scalar() or 0
# Get paginated records with user email
offset = calculate_offset(page, per_page)
query = (
select(model, User.email)
.join(User, model.user_id == User.id)
.order_by(desc(model.created_at))
.offset(offset)
.limit(per_page)
)
result = await db.execute(query)
rows = result.all()
records: list[R] = [row_mapper(record, email) for record, email in rows]
return records, total, calculate_total_pages(total, per_page)
def _to_counter_record_response(
record: CounterRecord, email: str
) -> CounterRecordResponse:
return CounterRecordResponse(
id=record.id,
user_email=email,
value_before=record.value_before,
value_after=record.value_after,
created_at=record.created_at,
)
def _to_sum_record_response(record: SumRecord, email: str) -> SumRecordResponse:
return SumRecordResponse(
id=record.id,
user_email=email,
a=record.a,
b=record.b,
result=record.result,
created_at=record.created_at,
)
@router.get("/counter", response_model=PaginatedCounterRecords)
async def get_counter_records(
page: int = Query(1, ge=1),
per_page: int = Query(10, ge=1, le=100),
db: AsyncSession = Depends(get_db),
_current_user: User = Depends(require_permission(Permission.VIEW_AUDIT)),
) -> PaginatedCounterRecords:
"""Get paginated counter action records."""
records, total, _ = await paginate_with_user_email(
db, CounterRecord, page, per_page, _to_counter_record_response
)
return create_paginated_response(records, total, page, per_page)
@router.get("/sum", response_model=PaginatedSumRecords)
async def get_sum_records(
page: int = Query(1, ge=1),
per_page: int = Query(10, ge=1, le=100),
db: AsyncSession = Depends(get_db),
_current_user: User = Depends(require_permission(Permission.VIEW_AUDIT)),
) -> PaginatedSumRecords:
"""Get paginated sum action records."""
records, total, _ = await paginate_with_user_email(
db, SumRecord, page, per_page, _to_sum_record_response
)
return create_paginated_response(records, total, page, per_page)
@router.get("/random-jobs", response_model=list[RandomNumberOutcomeResponse])
async def get_random_job_outcomes(
db: AsyncSession = Depends(get_db),
_current_user: User = Depends(require_permission(Permission.VIEW_AUDIT)),
) -> list[RandomNumberOutcomeResponse]:
"""Get all random number job outcomes, newest first."""
# Explicit join to avoid N+1 query
query = (
select(RandomNumberOutcome, User.email)
.join(User, RandomNumberOutcome.triggered_by_user_id == User.id)
.order_by(desc(RandomNumberOutcome.created_at))
)
result = await db.execute(query)
rows = result.all()
return [
RandomNumberOutcomeResponse(
id=outcome.id,
job_id=outcome.job_id,
triggered_by_user_id=outcome.triggered_by_user_id,
triggered_by_email=email,
value=outcome.value,
duration_ms=outcome.duration_ms,
status=outcome.status,
created_at=outcome.created_at,
)
for outcome, email in rows
]