Issue #5: Date validation logic was duplicated across availability and booking routes. Changes: - Add date_validation.py with shared utilities: - get_bookable_date_range: returns (min_date, max_date) tuple - validate_date_in_range: validates date with contextual errors - Update routes/availability.py to use shared utilities - Update routes/booking.py to use shared utilities - Remove redundant _get_date_range_bounds and _get_bookable_date_range - Error messages now include context (book, set availability, etc.)
379 lines
13 KiB
Python
379 lines
13 KiB
Python
"""Booking routes for users to book appointments."""
|
|
|
|
from collections.abc import Sequence
|
|
from datetime import UTC, date, datetime, time, timedelta
|
|
|
|
from fastapi import APIRouter, Depends, HTTPException, Query
|
|
from sqlalchemy import and_, func, select
|
|
from sqlalchemy.exc import IntegrityError
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from sqlalchemy.orm import joinedload
|
|
|
|
from auth import require_permission
|
|
from database import get_db
|
|
from date_validation import validate_date_in_range
|
|
from models import Appointment, AppointmentStatus, Availability, Permission, User
|
|
from pagination import calculate_offset, create_paginated_response
|
|
from schemas import (
|
|
AppointmentResponse,
|
|
AvailableSlotsResponse,
|
|
BookableSlot,
|
|
BookingRequest,
|
|
PaginatedAppointments,
|
|
)
|
|
from shared_constants import SLOT_DURATION_MINUTES
|
|
|
|
router = APIRouter(prefix="/api/booking", tags=["booking"])
|
|
|
|
|
|
def _to_appointment_response(
|
|
appointment: Appointment,
|
|
user_email: str | None = None,
|
|
) -> AppointmentResponse:
|
|
"""Convert an Appointment model to AppointmentResponse schema.
|
|
|
|
Args:
|
|
appointment: The appointment model instance
|
|
user_email: Optional user email. If not provided, uses appointment.user.email
|
|
"""
|
|
email = user_email if user_email is not None else appointment.user.email
|
|
return AppointmentResponse(
|
|
id=appointment.id,
|
|
user_id=appointment.user_id,
|
|
user_email=email,
|
|
slot_start=appointment.slot_start,
|
|
slot_end=appointment.slot_end,
|
|
note=appointment.note,
|
|
status=appointment.status.value,
|
|
created_at=appointment.created_at,
|
|
cancelled_at=appointment.cancelled_at,
|
|
)
|
|
|
|
|
|
def _get_valid_minute_boundaries() -> tuple[int, ...]:
|
|
"""Get valid minute boundaries based on SLOT_DURATION_MINUTES.
|
|
|
|
Assumes SLOT_DURATION_MINUTES divides 60 evenly (e.g., 15 minutes = 0, 15, 30, 45).
|
|
"""
|
|
boundaries: list[int] = []
|
|
minute = 0
|
|
while minute < 60:
|
|
boundaries.append(minute)
|
|
minute += SLOT_DURATION_MINUTES
|
|
return tuple(boundaries)
|
|
|
|
|
|
def _validate_booking_date(d: date) -> None:
|
|
"""Validate a date is within the bookable range."""
|
|
validate_date_in_range(d, context="book")
|
|
|
|
|
|
def _expand_availability_to_slots(
|
|
availability_slots: Sequence[Availability],
|
|
target_date: date,
|
|
) -> list[BookableSlot]:
|
|
"""Expand availability time ranges into 15-minute bookable slots."""
|
|
result: list[BookableSlot] = []
|
|
|
|
for avail in availability_slots:
|
|
# Create datetime objects for start and end
|
|
current = datetime.combine(target_date, avail.start_time, tzinfo=UTC)
|
|
end = datetime.combine(target_date, avail.end_time, tzinfo=UTC)
|
|
|
|
# Generate 15-minute slots
|
|
while current + timedelta(minutes=SLOT_DURATION_MINUTES) <= end:
|
|
slot_end = current + timedelta(minutes=SLOT_DURATION_MINUTES)
|
|
result.append(BookableSlot(start_time=current, end_time=slot_end))
|
|
current = slot_end
|
|
|
|
return result
|
|
|
|
|
|
@router.get("/slots", response_model=AvailableSlotsResponse)
|
|
async def get_available_slots(
|
|
target_date: date = Query(..., alias="date", description="Date to get slots for"),
|
|
db: AsyncSession = Depends(get_db),
|
|
_current_user: User = Depends(require_permission(Permission.BOOK_APPOINTMENT)),
|
|
) -> AvailableSlotsResponse:
|
|
"""Get available booking slots for a specific date."""
|
|
_validate_booking_date(target_date)
|
|
|
|
# Get availability for this date
|
|
result = await db.execute(
|
|
select(Availability)
|
|
.where(Availability.date == target_date)
|
|
.order_by(Availability.start_time)
|
|
)
|
|
availability_slots = result.scalars().all()
|
|
|
|
if not availability_slots:
|
|
return AvailableSlotsResponse(date=target_date, slots=[])
|
|
|
|
# Expand to 15-minute slots
|
|
all_slots = _expand_availability_to_slots(availability_slots, target_date)
|
|
|
|
# Get existing booked appointments for this date
|
|
day_start = datetime.combine(target_date, time.min, tzinfo=UTC)
|
|
day_end = datetime.combine(target_date, time.max, tzinfo=UTC)
|
|
|
|
result = await db.execute(
|
|
select(Appointment.slot_start).where(
|
|
and_(
|
|
Appointment.slot_start >= day_start,
|
|
Appointment.slot_start <= day_end,
|
|
Appointment.status == AppointmentStatus.BOOKED,
|
|
)
|
|
)
|
|
)
|
|
booked_starts = {row[0] for row in result.fetchall()}
|
|
|
|
# Filter out already booked slots
|
|
available_slots = [
|
|
slot for slot in all_slots if slot.start_time not in booked_starts
|
|
]
|
|
|
|
return AvailableSlotsResponse(date=target_date, slots=available_slots)
|
|
|
|
|
|
@router.post("", response_model=AppointmentResponse)
|
|
async def create_booking(
|
|
request: BookingRequest,
|
|
db: AsyncSession = Depends(get_db),
|
|
current_user: User = Depends(require_permission(Permission.BOOK_APPOINTMENT)),
|
|
) -> AppointmentResponse:
|
|
"""Book an appointment slot."""
|
|
slot_date = request.slot_start.date()
|
|
_validate_booking_date(slot_date)
|
|
|
|
# Validate slot is on the correct minute boundary
|
|
valid_minutes = _get_valid_minute_boundaries()
|
|
if request.slot_start.minute not in valid_minutes:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"Slot must be on {SLOT_DURATION_MINUTES}-minute boundary "
|
|
f"(valid minutes: {valid_minutes})",
|
|
)
|
|
if request.slot_start.second != 0 or request.slot_start.microsecond != 0:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail="Slot start time must not have seconds or microseconds",
|
|
)
|
|
|
|
# Verify slot falls within availability
|
|
slot_start_time = request.slot_start.time()
|
|
slot_end_dt = request.slot_start + timedelta(minutes=SLOT_DURATION_MINUTES)
|
|
slot_end_time = slot_end_dt.time()
|
|
|
|
result = await db.execute(
|
|
select(Availability).where(
|
|
and_(
|
|
Availability.date == slot_date,
|
|
Availability.start_time <= slot_start_time,
|
|
Availability.end_time >= slot_end_time,
|
|
)
|
|
)
|
|
)
|
|
matching_availability = result.scalar_one_or_none()
|
|
|
|
if not matching_availability:
|
|
slot_str = request.slot_start.strftime("%Y-%m-%d %H:%M")
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"Selected slot at {slot_str} UTC is not within "
|
|
f"any available time ranges for {slot_date}",
|
|
)
|
|
|
|
# Create the appointment
|
|
slot_end = request.slot_start + timedelta(minutes=SLOT_DURATION_MINUTES)
|
|
appointment = Appointment(
|
|
user_id=current_user.id,
|
|
slot_start=request.slot_start,
|
|
slot_end=slot_end,
|
|
note=request.note,
|
|
status=AppointmentStatus.BOOKED,
|
|
)
|
|
|
|
db.add(appointment)
|
|
|
|
try:
|
|
await db.commit()
|
|
await db.refresh(appointment)
|
|
except IntegrityError:
|
|
await db.rollback()
|
|
raise HTTPException(
|
|
status_code=409,
|
|
detail="This slot has already been booked. Select another slot.",
|
|
) from None
|
|
|
|
return _to_appointment_response(appointment, current_user.email)
|
|
|
|
|
|
# =============================================================================
|
|
# User's Appointments Endpoints
|
|
# =============================================================================
|
|
|
|
appointments_router = APIRouter(prefix="/api/appointments", tags=["appointments"])
|
|
|
|
|
|
@appointments_router.get("", response_model=list[AppointmentResponse])
|
|
async def get_my_appointments(
|
|
db: AsyncSession = Depends(get_db),
|
|
current_user: User = Depends(require_permission(Permission.VIEW_OWN_APPOINTMENTS)),
|
|
) -> list[AppointmentResponse]:
|
|
"""Get the current user's appointments, sorted by date (upcoming first)."""
|
|
result = await db.execute(
|
|
select(Appointment)
|
|
.where(Appointment.user_id == current_user.id)
|
|
.order_by(Appointment.slot_start.desc())
|
|
)
|
|
appointments = result.scalars().all()
|
|
|
|
return [_to_appointment_response(apt, current_user.email) for apt in appointments]
|
|
|
|
|
|
@appointments_router.post(
|
|
"/{appointment_id}/cancel", response_model=AppointmentResponse
|
|
)
|
|
async def cancel_my_appointment(
|
|
appointment_id: int,
|
|
db: AsyncSession = Depends(get_db),
|
|
current_user: User = Depends(require_permission(Permission.CANCEL_OWN_APPOINTMENT)),
|
|
) -> AppointmentResponse:
|
|
"""Cancel one of the current user's appointments."""
|
|
# Get the appointment with eager loading of user relationship
|
|
result = await db.execute(
|
|
select(Appointment)
|
|
.options(joinedload(Appointment.user))
|
|
.where(Appointment.id == appointment_id)
|
|
)
|
|
appointment = result.scalar_one_or_none()
|
|
|
|
if not appointment:
|
|
raise HTTPException(
|
|
status_code=404,
|
|
detail=f"Appointment {appointment_id} not found",
|
|
)
|
|
|
|
# Verify ownership
|
|
if appointment.user_id != current_user.id:
|
|
raise HTTPException(
|
|
status_code=403,
|
|
detail="Cannot cancel another user's appointment",
|
|
)
|
|
|
|
# Check if already cancelled
|
|
if appointment.status != AppointmentStatus.BOOKED:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"Cannot cancel: status is '{appointment.status.value}'",
|
|
)
|
|
|
|
# Check if appointment is in the past
|
|
if appointment.slot_start <= datetime.now(UTC):
|
|
apt_time = appointment.slot_start.strftime("%Y-%m-%d %H:%M")
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"Cannot cancel appointment at {apt_time} UTC: "
|
|
"already started or in the past",
|
|
)
|
|
|
|
# Cancel the appointment
|
|
appointment.status = AppointmentStatus.CANCELLED_BY_USER
|
|
appointment.cancelled_at = datetime.now(UTC)
|
|
|
|
await db.commit()
|
|
await db.refresh(appointment)
|
|
|
|
return _to_appointment_response(appointment, current_user.email)
|
|
|
|
|
|
# =============================================================================
|
|
# Admin Appointments Endpoints
|
|
# =============================================================================
|
|
|
|
admin_appointments_router = APIRouter(
|
|
prefix="/api/admin/appointments", tags=["admin-appointments"]
|
|
)
|
|
|
|
|
|
@admin_appointments_router.get("", response_model=PaginatedAppointments)
|
|
async def get_all_appointments(
|
|
page: int = Query(1, ge=1),
|
|
per_page: int = Query(10, ge=1, le=100),
|
|
db: AsyncSession = Depends(get_db),
|
|
_current_user: User = Depends(require_permission(Permission.VIEW_ALL_APPOINTMENTS)),
|
|
) -> PaginatedAppointments:
|
|
"""Get all appointments (admin only), sorted by date descending with pagination."""
|
|
# Get total count
|
|
count_result = await db.execute(select(func.count(Appointment.id)))
|
|
total = count_result.scalar() or 0
|
|
|
|
# Get paginated appointments with explicit eager loading of user relationship
|
|
offset = calculate_offset(page, per_page)
|
|
result = await db.execute(
|
|
select(Appointment)
|
|
.options(joinedload(Appointment.user))
|
|
.order_by(Appointment.slot_start.desc())
|
|
.offset(offset)
|
|
.limit(per_page)
|
|
)
|
|
appointments = result.scalars().all()
|
|
|
|
# Build responses using the eager-loaded user relationship
|
|
records = [
|
|
_to_appointment_response(apt) # Uses eager-loaded relationship
|
|
for apt in appointments
|
|
]
|
|
|
|
return create_paginated_response(records, total, page, per_page)
|
|
|
|
|
|
@admin_appointments_router.post(
|
|
"/{appointment_id}/cancel", response_model=AppointmentResponse
|
|
)
|
|
async def admin_cancel_appointment(
|
|
appointment_id: int,
|
|
db: AsyncSession = Depends(get_db),
|
|
_current_user: User = Depends(
|
|
require_permission(Permission.CANCEL_ANY_APPOINTMENT)
|
|
),
|
|
) -> AppointmentResponse:
|
|
"""Cancel any appointment (admin only)."""
|
|
# Get the appointment with eager loading of user relationship
|
|
result = await db.execute(
|
|
select(Appointment)
|
|
.options(joinedload(Appointment.user))
|
|
.where(Appointment.id == appointment_id)
|
|
)
|
|
appointment = result.scalar_one_or_none()
|
|
|
|
if not appointment:
|
|
raise HTTPException(
|
|
status_code=404,
|
|
detail=f"Appointment {appointment_id} not found",
|
|
)
|
|
|
|
# Check if already cancelled
|
|
if appointment.status != AppointmentStatus.BOOKED:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"Cannot cancel: status is '{appointment.status.value}'",
|
|
)
|
|
|
|
# Check if appointment is in the past
|
|
if appointment.slot_start <= datetime.now(UTC):
|
|
apt_time = appointment.slot_start.strftime("%Y-%m-%d %H:%M")
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"Cannot cancel appointment at {apt_time} UTC: "
|
|
"already started or in the past",
|
|
)
|
|
|
|
# Cancel the appointment
|
|
appointment.status = AppointmentStatus.CANCELLED_BY_ADMIN
|
|
appointment.cancelled_at = datetime.now(UTC)
|
|
|
|
await db.commit()
|
|
await db.refresh(appointment)
|
|
|
|
return _to_appointment_response(appointment) # Uses eager-loaded relationship
|