arbret/backend/routes/availability.py

201 lines
7.2 KiB
Python
Raw Normal View History

"""Availability routes for admin to manage booking availability."""
from datetime import date, timedelta
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy import select, delete, and_
from sqlalchemy.ext.asyncio import AsyncSession
from auth import require_permission
from database import get_db
from models import User, Availability, Permission
from schemas import (
TimeSlot,
AvailabilityDay,
AvailabilityResponse,
SetAvailabilityRequest,
CopyAvailabilityRequest,
)
from shared_constants import MIN_ADVANCE_DAYS, MAX_ADVANCE_DAYS
router = APIRouter(prefix="/api/admin/availability", tags=["availability"])
def _get_date_range_bounds() -> tuple[date, date]:
"""Get the valid date range for availability (using MIN_ADVANCE_DAYS to MAX_ADVANCE_DAYS)."""
today = date.today()
min_date = today + timedelta(days=MIN_ADVANCE_DAYS)
max_date = today + timedelta(days=MAX_ADVANCE_DAYS)
return min_date, max_date
def _validate_date_in_range(d: date, min_date: date, max_date: date) -> None:
"""Validate a date is within the allowed range."""
if d < min_date:
raise HTTPException(
status_code=400,
detail=f"Cannot set availability for past dates. Earliest allowed: {min_date}",
)
if d > max_date:
raise HTTPException(
status_code=400,
detail=f"Cannot set availability more than {MAX_ADVANCE_DAYS} days ahead. Latest allowed: {max_date}",
)
@router.get("", response_model=AvailabilityResponse)
async def get_availability(
from_date: date = Query(..., alias="from", description="Start date (inclusive)"),
to_date: date = Query(..., alias="to", description="End date (inclusive)"),
db: AsyncSession = Depends(get_db),
_current_user: User = Depends(require_permission(Permission.MANAGE_AVAILABILITY)),
) -> AvailabilityResponse:
"""Get availability slots for a date range."""
if from_date > to_date:
raise HTTPException(
status_code=400,
detail="'from' date must be before or equal to 'to' date",
)
# Query availability in range
result = await db.execute(
select(Availability)
.where(and_(Availability.date >= from_date, Availability.date <= to_date))
.order_by(Availability.date, Availability.start_time)
)
slots = result.scalars().all()
# Group by date
days_dict: dict[date, list[TimeSlot]] = {}
for slot in slots:
if slot.date not in days_dict:
days_dict[slot.date] = []
days_dict[slot.date].append(TimeSlot(
start_time=slot.start_time,
end_time=slot.end_time,
))
# Convert to response format
days = [
AvailabilityDay(date=d, slots=days_dict[d])
for d in sorted(days_dict.keys())
]
return AvailabilityResponse(days=days)
@router.put("", response_model=AvailabilityDay)
async def set_availability(
request: SetAvailabilityRequest,
db: AsyncSession = Depends(get_db),
_current_user: User = Depends(require_permission(Permission.MANAGE_AVAILABILITY)),
) -> AvailabilityDay:
"""Set availability for a specific date. Replaces any existing availability."""
min_date, max_date = _get_date_range_bounds()
_validate_date_in_range(request.date, min_date, max_date)
# Validate slots don't overlap
sorted_slots = sorted(request.slots, key=lambda s: s.start_time)
for i in range(len(sorted_slots) - 1):
if sorted_slots[i].end_time > sorted_slots[i + 1].start_time:
raise HTTPException(
status_code=400,
detail=f"Time slots overlap on {request.date}: slot ending at {sorted_slots[i].end_time} overlaps with slot starting at {sorted_slots[i + 1].start_time}. Please ensure all time slots are non-overlapping.",
)
# Validate each slot's end_time > start_time
for slot in request.slots:
if slot.end_time <= slot.start_time:
raise HTTPException(
status_code=400,
detail=f"Invalid time slot on {request.date}: end time {slot.end_time} must be after start time {slot.start_time}. Please correct the time range.",
)
# Delete existing availability for this date
await db.execute(
delete(Availability).where(Availability.date == request.date)
)
# Create new availability slots
for slot in request.slots:
availability = Availability(
date=request.date,
start_time=slot.start_time,
end_time=slot.end_time,
)
db.add(availability)
await db.commit()
return AvailabilityDay(date=request.date, slots=request.slots)
@router.post("/copy", response_model=AvailabilityResponse)
async def copy_availability(
request: CopyAvailabilityRequest,
db: AsyncSession = Depends(get_db),
_current_user: User = Depends(require_permission(Permission.MANAGE_AVAILABILITY)),
) -> AvailabilityResponse:
"""Copy availability from one day to multiple target days."""
min_date, max_date = _get_date_range_bounds()
# Validate source date is in range (for consistency, though DB query would fail anyway)
_validate_date_in_range(request.source_date, min_date, max_date)
# Validate target dates
for target_date in request.target_dates:
_validate_date_in_range(target_date, min_date, max_date)
# Get source availability
result = await db.execute(
select(Availability)
.where(Availability.date == request.source_date)
.order_by(Availability.start_time)
)
source_slots = result.scalars().all()
if not source_slots:
raise HTTPException(
status_code=400,
detail=f"No availability found for source date {request.source_date}",
)
# Copy to each target date within a single atomic transaction
# All deletes and inserts happen before commit, ensuring atomicity
copied_days: list[AvailabilityDay] = []
try:
for target_date in request.target_dates:
if target_date == request.source_date:
continue # Skip copying to self
# Delete existing availability for target date
await db.execute(
delete(Availability).where(Availability.date == target_date)
)
# Copy slots
target_slots: list[TimeSlot] = []
for source_slot in source_slots:
new_availability = Availability(
date=target_date,
start_time=source_slot.start_time,
end_time=source_slot.end_time,
)
db.add(new_availability)
target_slots.append(TimeSlot(
start_time=source_slot.start_time,
end_time=source_slot.end_time,
))
copied_days.append(AvailabilityDay(date=target_date, slots=target_slots))
# Commit all changes atomically
await db.commit()
except Exception:
# Rollback on any error to maintain atomicity
await db.rollback()
raise
return AvailabilityResponse(days=copied_days)