arbret/backend/routes/availability.py

179 lines
6.1 KiB
Python
Raw Normal View History

"""Availability routes for admin to manage booking availability."""
from datetime import date
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy import and_, delete, select
from sqlalchemy.ext.asyncio import AsyncSession
from auth import require_permission
from database import get_db
from date_validation import validate_date_in_range
from models import Availability, Permission, User
from schemas import (
AvailabilityDay,
AvailabilityResponse,
CopyAvailabilityRequest,
SetAvailabilityRequest,
TimeSlot,
)
router = APIRouter(prefix="/api/admin/availability", tags=["availability"])
@router.get("", response_model=AvailabilityResponse)
async def get_availability(
from_date: date = Query(..., alias="from", description="Start date (inclusive)"),
to_date: date = Query(..., alias="to", description="End date (inclusive)"),
db: AsyncSession = Depends(get_db),
_current_user: User = Depends(require_permission(Permission.MANAGE_AVAILABILITY)),
) -> AvailabilityResponse:
"""Get availability slots for a date range."""
if from_date > to_date:
raise HTTPException(
status_code=400,
detail="'from' date must be before or equal to 'to' date",
)
# Query availability in range
result = await db.execute(
select(Availability)
.where(and_(Availability.date >= from_date, Availability.date <= to_date))
.order_by(Availability.date, Availability.start_time)
)
slots = result.scalars().all()
# Group by date
days_dict: dict[date, list[TimeSlot]] = {}
for slot in slots:
if slot.date not in days_dict:
days_dict[slot.date] = []
days_dict[slot.date].append(
TimeSlot(
start_time=slot.start_time,
end_time=slot.end_time,
)
)
# Convert to response format
days = [
AvailabilityDay(date=d, slots=days_dict[d]) for d in sorted(days_dict.keys())
]
return AvailabilityResponse(days=days)
@router.put("", response_model=AvailabilityDay)
async def set_availability(
request: SetAvailabilityRequest,
db: AsyncSession = Depends(get_db),
_current_user: User = Depends(require_permission(Permission.MANAGE_AVAILABILITY)),
) -> AvailabilityDay:
"""Set availability for a specific date. Replaces any existing availability."""
validate_date_in_range(request.date, context="set availability")
# Validate slots don't overlap
sorted_slots = sorted(request.slots, key=lambda s: s.start_time)
for i in range(len(sorted_slots) - 1):
if sorted_slots[i].end_time > sorted_slots[i + 1].start_time:
end = sorted_slots[i].end_time
start = sorted_slots[i + 1].start_time
raise HTTPException(
status_code=400,
detail=f"Time slots overlap: slot ending at {end} "
f"overlaps with slot starting at {start}",
)
# Validate each slot's end_time > start_time
for slot in request.slots:
if slot.end_time <= slot.start_time:
raise HTTPException(
status_code=400,
detail=f"Invalid time slot: end time {slot.end_time} "
f"must be after start time {slot.start_time}",
)
# Delete existing availability for this date
await db.execute(delete(Availability).where(Availability.date == request.date))
# Create new availability slots
for slot in request.slots:
availability = Availability(
date=request.date,
start_time=slot.start_time,
end_time=slot.end_time,
)
db.add(availability)
await db.commit()
return AvailabilityDay(date=request.date, slots=request.slots)
@router.post("/copy", response_model=AvailabilityResponse)
async def copy_availability(
request: CopyAvailabilityRequest,
db: AsyncSession = Depends(get_db),
_current_user: User = Depends(require_permission(Permission.MANAGE_AVAILABILITY)),
) -> AvailabilityResponse:
"""Copy availability from one day to multiple target days."""
# Validate source date is in range
validate_date_in_range(request.source_date, context="copy from")
# Validate target dates
for target_date in request.target_dates:
validate_date_in_range(target_date, context="copy to")
# Get source availability
result = await db.execute(
select(Availability)
.where(Availability.date == request.source_date)
.order_by(Availability.start_time)
)
source_slots = result.scalars().all()
if not source_slots:
raise HTTPException(
status_code=400,
detail=f"No availability found for source date {request.source_date}",
)
# Copy to each target date within a single atomic transaction
# All deletes and inserts happen before commit, ensuring atomicity
copied_days: list[AvailabilityDay] = []
try:
for target_date in request.target_dates:
if target_date == request.source_date:
continue # Skip copying to self
# Delete existing availability for target date
del_query = delete(Availability).where(Availability.date == target_date)
await db.execute(del_query)
# Copy slots
target_slots: list[TimeSlot] = []
for source_slot in source_slots:
new_availability = Availability(
date=target_date,
start_time=source_slot.start_time,
end_time=source_slot.end_time,
)
db.add(new_availability)
target_slots.append(
TimeSlot(
start_time=source_slot.start_time,
end_time=source_slot.end_time,
)
)
copied_days.append(AvailabilityDay(date=target_date, slots=target_slots))
# Commit all changes atomically
await db.commit()
except Exception:
# Rollback on any error to maintain atomicity
await db.rollback()
raise
return AvailabilityResponse(days=copied_days)