arbret/backend/routes/booking.py

280 lines
9.4 KiB
Python

"""Booking routes for users to book appointments."""
from datetime import date, datetime, time, timedelta, timezone
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy import select, and_
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.asyncio import AsyncSession
from auth import require_permission
from database import get_db
from models import User, Availability, Appointment, AppointmentStatus, Permission
from schemas import (
BookableSlot,
AvailableSlotsResponse,
BookingRequest,
AppointmentResponse,
)
router = APIRouter(prefix="/api/booking", tags=["booking"])
# From shared/constants.json
SLOT_DURATION_MINUTES = 15
MIN_ADVANCE_DAYS = 1
MAX_ADVANCE_DAYS = 30
def _get_bookable_date_range() -> tuple[date, date]:
"""Get the valid date range for booking (tomorrow to +30 days)."""
today = date.today()
min_date = today + timedelta(days=MIN_ADVANCE_DAYS)
max_date = today + timedelta(days=MAX_ADVANCE_DAYS)
return min_date, max_date
def _validate_booking_date(d: date) -> None:
"""Validate a date is within the bookable range."""
min_date, max_date = _get_bookable_date_range()
if d < min_date:
raise HTTPException(
status_code=400,
detail=f"Cannot book for today or past dates. Earliest bookable date: {min_date}",
)
if d > max_date:
raise HTTPException(
status_code=400,
detail=f"Cannot book more than {MAX_ADVANCE_DAYS} days ahead. Latest bookable: {max_date}",
)
def _expand_availability_to_slots(
availability_slots: list[Availability],
target_date: date,
) -> list[BookableSlot]:
"""Expand availability time ranges into 15-minute bookable slots."""
result: list[BookableSlot] = []
for avail in availability_slots:
# Create datetime objects for start and end
current = datetime.combine(target_date, avail.start_time, tzinfo=timezone.utc)
end = datetime.combine(target_date, avail.end_time, tzinfo=timezone.utc)
# Generate 15-minute slots
while current + timedelta(minutes=SLOT_DURATION_MINUTES) <= end:
slot_end = current + timedelta(minutes=SLOT_DURATION_MINUTES)
result.append(BookableSlot(start_time=current, end_time=slot_end))
current = slot_end
return result
@router.get("/slots", response_model=AvailableSlotsResponse)
async def get_available_slots(
target_date: date = Query(..., alias="date", description="Date to get slots for"),
db: AsyncSession = Depends(get_db),
_current_user: User = Depends(require_permission(Permission.BOOK_APPOINTMENT)),
) -> AvailableSlotsResponse:
"""Get available booking slots for a specific date."""
_validate_booking_date(target_date)
# Get availability for this date
result = await db.execute(
select(Availability)
.where(Availability.date == target_date)
.order_by(Availability.start_time)
)
availability_slots = result.scalars().all()
if not availability_slots:
return AvailableSlotsResponse(date=target_date, slots=[])
# Expand to 15-minute slots
all_slots = _expand_availability_to_slots(availability_slots, target_date)
# Get existing booked appointments for this date
day_start = datetime.combine(target_date, time.min, tzinfo=timezone.utc)
day_end = datetime.combine(target_date, time.max, tzinfo=timezone.utc)
result = await db.execute(
select(Appointment.slot_start)
.where(
and_(
Appointment.slot_start >= day_start,
Appointment.slot_start <= day_end,
Appointment.status == AppointmentStatus.BOOKED,
)
)
)
booked_starts = {row[0] for row in result.fetchall()}
# Filter out already booked slots
available_slots = [
slot for slot in all_slots
if slot.start_time not in booked_starts
]
return AvailableSlotsResponse(date=target_date, slots=available_slots)
@router.post("", response_model=AppointmentResponse)
async def create_booking(
request: BookingRequest,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(require_permission(Permission.BOOK_APPOINTMENT)),
) -> AppointmentResponse:
"""Book an appointment slot."""
slot_date = request.slot_start.date()
_validate_booking_date(slot_date)
# Validate slot is on 15-minute boundary
if request.slot_start.minute not in (0, 15, 30, 45):
raise HTTPException(
status_code=400,
detail="Slot start time must be on 15-minute boundary",
)
if request.slot_start.second != 0 or request.slot_start.microsecond != 0:
raise HTTPException(
status_code=400,
detail="Slot start time must not have seconds or microseconds",
)
# Verify slot falls within availability
slot_start_time = request.slot_start.time()
slot_end_time = (request.slot_start + timedelta(minutes=SLOT_DURATION_MINUTES)).time()
result = await db.execute(
select(Availability)
.where(
and_(
Availability.date == slot_date,
Availability.start_time <= slot_start_time,
Availability.end_time >= slot_end_time,
)
)
)
matching_availability = result.scalar_one_or_none()
if not matching_availability:
raise HTTPException(
status_code=400,
detail="Selected slot is not within available time ranges",
)
# Create the appointment
slot_end = request.slot_start + timedelta(minutes=SLOT_DURATION_MINUTES)
appointment = Appointment(
user_id=current_user.id,
slot_start=request.slot_start,
slot_end=slot_end,
note=request.note,
status=AppointmentStatus.BOOKED,
)
db.add(appointment)
try:
await db.commit()
await db.refresh(appointment)
except IntegrityError:
await db.rollback()
raise HTTPException(
status_code=409,
detail="This slot has already been booked. Please select another slot.",
)
return AppointmentResponse(
id=appointment.id,
user_id=appointment.user_id,
user_email=current_user.email,
slot_start=appointment.slot_start,
slot_end=appointment.slot_end,
note=appointment.note,
status=appointment.status.value,
created_at=appointment.created_at,
cancelled_at=appointment.cancelled_at,
)
# =============================================================================
# User's Appointments Endpoints
# =============================================================================
appointments_router = APIRouter(prefix="/api/appointments", tags=["appointments"])
@appointments_router.get("", response_model=list[AppointmentResponse])
async def get_my_appointments(
db: AsyncSession = Depends(get_db),
current_user: User = Depends(require_permission(Permission.VIEW_OWN_APPOINTMENTS)),
) -> list[AppointmentResponse]:
"""Get the current user's appointments, sorted by date (upcoming first)."""
result = await db.execute(
select(Appointment)
.where(Appointment.user_id == current_user.id)
.order_by(Appointment.slot_start.desc())
)
appointments = result.scalars().all()
return [
AppointmentResponse(
id=apt.id,
user_id=apt.user_id,
user_email=current_user.email,
slot_start=apt.slot_start,
slot_end=apt.slot_end,
note=apt.note,
status=apt.status.value,
created_at=apt.created_at,
cancelled_at=apt.cancelled_at,
)
for apt in appointments
]
@appointments_router.post("/{appointment_id}/cancel", response_model=AppointmentResponse)
async def cancel_my_appointment(
appointment_id: int,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(require_permission(Permission.CANCEL_OWN_APPOINTMENT)),
) -> AppointmentResponse:
"""Cancel one of the current user's appointments."""
# Get the appointment
result = await db.execute(
select(Appointment).where(Appointment.id == appointment_id)
)
appointment = result.scalar_one_or_none()
if not appointment:
raise HTTPException(status_code=404, detail="Appointment not found")
# Verify ownership
if appointment.user_id != current_user.id:
raise HTTPException(status_code=403, detail="Cannot cancel another user's appointment")
# Check if already cancelled
if appointment.status != AppointmentStatus.BOOKED:
raise HTTPException(
status_code=400,
detail=f"Cannot cancel appointment with status '{appointment.status.value}'"
)
# Cancel the appointment
appointment.status = AppointmentStatus.CANCELLED_BY_USER
appointment.cancelled_at = datetime.now(timezone.utc)
await db.commit()
await db.refresh(appointment)
return AppointmentResponse(
id=appointment.id,
user_id=appointment.user_id,
user_email=current_user.email,
slot_start=appointment.slot_start,
slot_end=appointment.slot_end,
note=appointment.note,
status=appointment.status.value,
created_at=appointment.created_at,
cancelled_at=appointment.cancelled_at,
)