- Created _get_valid_minute_boundaries() helper that derives valid minutes from SLOT_DURATION_MINUTES - Replaced hardcoded (0, 15, 30, 45) with dynamic calculation - Error message now includes valid minute values for better clarity
376 lines
13 KiB
Python
376 lines
13 KiB
Python
"""Booking routes for users to book appointments."""
|
|
from datetime import date, datetime, time, timedelta, timezone
|
|
|
|
from fastapi import APIRouter, Depends, HTTPException, Query
|
|
from sqlalchemy import select, and_, func
|
|
from sqlalchemy.exc import IntegrityError
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from auth import require_permission
|
|
from database import get_db
|
|
from models import User, Availability, Appointment, AppointmentStatus, Permission
|
|
from schemas import (
|
|
BookableSlot,
|
|
AvailableSlotsResponse,
|
|
BookingRequest,
|
|
AppointmentResponse,
|
|
PaginatedAppointments,
|
|
)
|
|
from shared_constants import SLOT_DURATION_MINUTES, MIN_ADVANCE_DAYS, MAX_ADVANCE_DAYS
|
|
|
|
|
|
router = APIRouter(prefix="/api/booking", tags=["booking"])
|
|
|
|
|
|
def _to_appointment_response(
|
|
appointment: Appointment,
|
|
user_email: str | None = None,
|
|
) -> AppointmentResponse:
|
|
"""Convert an Appointment model to AppointmentResponse schema.
|
|
|
|
Args:
|
|
appointment: The appointment model instance
|
|
user_email: Optional user email. If not provided, uses appointment.user.email
|
|
"""
|
|
email = user_email if user_email is not None else appointment.user.email
|
|
return AppointmentResponse(
|
|
id=appointment.id,
|
|
user_id=appointment.user_id,
|
|
user_email=email,
|
|
slot_start=appointment.slot_start,
|
|
slot_end=appointment.slot_end,
|
|
note=appointment.note,
|
|
status=appointment.status.value,
|
|
created_at=appointment.created_at,
|
|
cancelled_at=appointment.cancelled_at,
|
|
)
|
|
|
|
|
|
def _get_valid_minute_boundaries() -> tuple[int, ...]:
|
|
"""Get valid minute boundaries based on SLOT_DURATION_MINUTES.
|
|
|
|
Assumes SLOT_DURATION_MINUTES divides 60 evenly (e.g., 15 minutes = 0, 15, 30, 45).
|
|
"""
|
|
boundaries: list[int] = []
|
|
minute = 0
|
|
while minute < 60:
|
|
boundaries.append(minute)
|
|
minute += SLOT_DURATION_MINUTES
|
|
return tuple(boundaries)
|
|
|
|
|
|
def _get_bookable_date_range() -> tuple[date, date]:
|
|
"""Get the valid date range for booking (tomorrow to +30 days)."""
|
|
today = date.today()
|
|
min_date = today + timedelta(days=MIN_ADVANCE_DAYS)
|
|
max_date = today + timedelta(days=MAX_ADVANCE_DAYS)
|
|
return min_date, max_date
|
|
|
|
|
|
def _validate_booking_date(d: date) -> None:
|
|
"""Validate a date is within the bookable range."""
|
|
min_date, max_date = _get_bookable_date_range()
|
|
if d < min_date:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"Cannot book for today or past dates. Earliest bookable date: {min_date}",
|
|
)
|
|
if d > max_date:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"Cannot book more than {MAX_ADVANCE_DAYS} days ahead. Latest bookable: {max_date}",
|
|
)
|
|
|
|
|
|
def _expand_availability_to_slots(
|
|
availability_slots: list[Availability],
|
|
target_date: date,
|
|
) -> list[BookableSlot]:
|
|
"""Expand availability time ranges into 15-minute bookable slots."""
|
|
result: list[BookableSlot] = []
|
|
|
|
for avail in availability_slots:
|
|
# Create datetime objects for start and end
|
|
current = datetime.combine(target_date, avail.start_time, tzinfo=timezone.utc)
|
|
end = datetime.combine(target_date, avail.end_time, tzinfo=timezone.utc)
|
|
|
|
# Generate 15-minute slots
|
|
while current + timedelta(minutes=SLOT_DURATION_MINUTES) <= end:
|
|
slot_end = current + timedelta(minutes=SLOT_DURATION_MINUTES)
|
|
result.append(BookableSlot(start_time=current, end_time=slot_end))
|
|
current = slot_end
|
|
|
|
return result
|
|
|
|
|
|
@router.get("/slots", response_model=AvailableSlotsResponse)
|
|
async def get_available_slots(
|
|
target_date: date = Query(..., alias="date", description="Date to get slots for"),
|
|
db: AsyncSession = Depends(get_db),
|
|
_current_user: User = Depends(require_permission(Permission.BOOK_APPOINTMENT)),
|
|
) -> AvailableSlotsResponse:
|
|
"""Get available booking slots for a specific date."""
|
|
_validate_booking_date(target_date)
|
|
|
|
# Get availability for this date
|
|
result = await db.execute(
|
|
select(Availability)
|
|
.where(Availability.date == target_date)
|
|
.order_by(Availability.start_time)
|
|
)
|
|
availability_slots = result.scalars().all()
|
|
|
|
if not availability_slots:
|
|
return AvailableSlotsResponse(date=target_date, slots=[])
|
|
|
|
# Expand to 15-minute slots
|
|
all_slots = _expand_availability_to_slots(availability_slots, target_date)
|
|
|
|
# Get existing booked appointments for this date
|
|
day_start = datetime.combine(target_date, time.min, tzinfo=timezone.utc)
|
|
day_end = datetime.combine(target_date, time.max, tzinfo=timezone.utc)
|
|
|
|
result = await db.execute(
|
|
select(Appointment.slot_start)
|
|
.where(
|
|
and_(
|
|
Appointment.slot_start >= day_start,
|
|
Appointment.slot_start <= day_end,
|
|
Appointment.status == AppointmentStatus.BOOKED,
|
|
)
|
|
)
|
|
)
|
|
booked_starts = {row[0] for row in result.fetchall()}
|
|
|
|
# Filter out already booked slots
|
|
available_slots = [
|
|
slot for slot in all_slots
|
|
if slot.start_time not in booked_starts
|
|
]
|
|
|
|
return AvailableSlotsResponse(date=target_date, slots=available_slots)
|
|
|
|
|
|
@router.post("", response_model=AppointmentResponse)
|
|
async def create_booking(
|
|
request: BookingRequest,
|
|
db: AsyncSession = Depends(get_db),
|
|
current_user: User = Depends(require_permission(Permission.BOOK_APPOINTMENT)),
|
|
) -> AppointmentResponse:
|
|
"""Book an appointment slot."""
|
|
slot_date = request.slot_start.date()
|
|
_validate_booking_date(slot_date)
|
|
|
|
# Validate slot is on the correct minute boundary (derived from SLOT_DURATION_MINUTES)
|
|
valid_minutes = _get_valid_minute_boundaries()
|
|
if request.slot_start.minute not in valid_minutes:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"Slot start time must be on {SLOT_DURATION_MINUTES}-minute boundary (valid minutes: {valid_minutes})",
|
|
)
|
|
if request.slot_start.second != 0 or request.slot_start.microsecond != 0:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail="Slot start time must not have seconds or microseconds",
|
|
)
|
|
|
|
# Verify slot falls within availability
|
|
slot_start_time = request.slot_start.time()
|
|
slot_end_time = (request.slot_start + timedelta(minutes=SLOT_DURATION_MINUTES)).time()
|
|
|
|
result = await db.execute(
|
|
select(Availability)
|
|
.where(
|
|
and_(
|
|
Availability.date == slot_date,
|
|
Availability.start_time <= slot_start_time,
|
|
Availability.end_time >= slot_end_time,
|
|
)
|
|
)
|
|
)
|
|
matching_availability = result.scalar_one_or_none()
|
|
|
|
if not matching_availability:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail="Selected slot is not within available time ranges",
|
|
)
|
|
|
|
# Create the appointment
|
|
slot_end = request.slot_start + timedelta(minutes=SLOT_DURATION_MINUTES)
|
|
appointment = Appointment(
|
|
user_id=current_user.id,
|
|
slot_start=request.slot_start,
|
|
slot_end=slot_end,
|
|
note=request.note,
|
|
status=AppointmentStatus.BOOKED,
|
|
)
|
|
|
|
db.add(appointment)
|
|
|
|
try:
|
|
await db.commit()
|
|
await db.refresh(appointment)
|
|
except IntegrityError:
|
|
await db.rollback()
|
|
raise HTTPException(
|
|
status_code=409,
|
|
detail="This slot has already been booked. Please select another slot.",
|
|
)
|
|
|
|
return _to_appointment_response(appointment, current_user.email)
|
|
|
|
|
|
# =============================================================================
|
|
# User's Appointments Endpoints
|
|
# =============================================================================
|
|
|
|
appointments_router = APIRouter(prefix="/api/appointments", tags=["appointments"])
|
|
|
|
|
|
@appointments_router.get("", response_model=list[AppointmentResponse])
|
|
async def get_my_appointments(
|
|
db: AsyncSession = Depends(get_db),
|
|
current_user: User = Depends(require_permission(Permission.VIEW_OWN_APPOINTMENTS)),
|
|
) -> list[AppointmentResponse]:
|
|
"""Get the current user's appointments, sorted by date (upcoming first)."""
|
|
result = await db.execute(
|
|
select(Appointment)
|
|
.where(Appointment.user_id == current_user.id)
|
|
.order_by(Appointment.slot_start.desc())
|
|
)
|
|
appointments = result.scalars().all()
|
|
|
|
return [
|
|
_to_appointment_response(apt, current_user.email)
|
|
for apt in appointments
|
|
]
|
|
|
|
|
|
@appointments_router.post("/{appointment_id}/cancel", response_model=AppointmentResponse)
|
|
async def cancel_my_appointment(
|
|
appointment_id: int,
|
|
db: AsyncSession = Depends(get_db),
|
|
current_user: User = Depends(require_permission(Permission.CANCEL_OWN_APPOINTMENT)),
|
|
) -> AppointmentResponse:
|
|
"""Cancel one of the current user's appointments."""
|
|
# Get the appointment
|
|
result = await db.execute(
|
|
select(Appointment).where(Appointment.id == appointment_id)
|
|
)
|
|
appointment = result.scalar_one_or_none()
|
|
|
|
if not appointment:
|
|
raise HTTPException(status_code=404, detail="Appointment not found")
|
|
|
|
# Verify ownership
|
|
if appointment.user_id != current_user.id:
|
|
raise HTTPException(status_code=403, detail="Cannot cancel another user's appointment")
|
|
|
|
# Check if already cancelled
|
|
if appointment.status != AppointmentStatus.BOOKED:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"Cannot cancel appointment with status '{appointment.status.value}'"
|
|
)
|
|
|
|
# Check if appointment is in the past
|
|
if appointment.slot_start <= datetime.now(timezone.utc):
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail="Cannot cancel a past appointment"
|
|
)
|
|
|
|
# Cancel the appointment
|
|
appointment.status = AppointmentStatus.CANCELLED_BY_USER
|
|
appointment.cancelled_at = datetime.now(timezone.utc)
|
|
|
|
await db.commit()
|
|
await db.refresh(appointment)
|
|
|
|
return _to_appointment_response(appointment, current_user.email)
|
|
|
|
|
|
# =============================================================================
|
|
# Admin Appointments Endpoints
|
|
# =============================================================================
|
|
|
|
admin_appointments_router = APIRouter(prefix="/api/admin/appointments", tags=["admin-appointments"])
|
|
|
|
|
|
@admin_appointments_router.get("", response_model=PaginatedAppointments)
|
|
async def get_all_appointments(
|
|
page: int = Query(1, ge=1),
|
|
per_page: int = Query(10, ge=1, le=100),
|
|
db: AsyncSession = Depends(get_db),
|
|
_current_user: User = Depends(require_permission(Permission.VIEW_ALL_APPOINTMENTS)),
|
|
) -> PaginatedAppointments:
|
|
"""Get all appointments (admin only), sorted by date descending with pagination."""
|
|
# Get total count
|
|
count_result = await db.execute(select(func.count(Appointment.id)))
|
|
total = count_result.scalar() or 0
|
|
total_pages = (total + per_page - 1) // per_page if total > 0 else 1
|
|
|
|
# Get paginated appointments (user relationship is eager-loaded via lazy="joined")
|
|
offset = (page - 1) * per_page
|
|
result = await db.execute(
|
|
select(Appointment)
|
|
.order_by(Appointment.slot_start.desc())
|
|
.offset(offset)
|
|
.limit(per_page)
|
|
)
|
|
appointments = result.scalars().all()
|
|
|
|
# Build responses using the eager-loaded user relationship
|
|
records = [
|
|
_to_appointment_response(apt) # Uses eager-loaded relationship
|
|
for apt in appointments
|
|
]
|
|
|
|
return PaginatedAppointments(
|
|
records=records,
|
|
total=total,
|
|
page=page,
|
|
per_page=per_page,
|
|
total_pages=total_pages,
|
|
)
|
|
|
|
|
|
@admin_appointments_router.post("/{appointment_id}/cancel", response_model=AppointmentResponse)
|
|
async def admin_cancel_appointment(
|
|
appointment_id: int,
|
|
db: AsyncSession = Depends(get_db),
|
|
_current_user: User = Depends(require_permission(Permission.CANCEL_ANY_APPOINTMENT)),
|
|
) -> AppointmentResponse:
|
|
"""Cancel any appointment (admin only)."""
|
|
# Get the appointment
|
|
result = await db.execute(
|
|
select(Appointment).where(Appointment.id == appointment_id)
|
|
)
|
|
appointment = result.scalar_one_or_none()
|
|
|
|
if not appointment:
|
|
raise HTTPException(status_code=404, detail="Appointment not found")
|
|
|
|
# Check if already cancelled
|
|
if appointment.status != AppointmentStatus.BOOKED:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"Cannot cancel appointment with status '{appointment.status.value}'"
|
|
)
|
|
|
|
# Check if appointment is in the past
|
|
if appointment.slot_start <= datetime.now(timezone.utc):
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail="Cannot cancel a past appointment"
|
|
)
|
|
|
|
# Cancel the appointment
|
|
appointment.status = AppointmentStatus.CANCELLED_BY_ADMIN
|
|
appointment.cancelled_at = datetime.now(timezone.utc)
|
|
|
|
await db.commit()
|
|
await db.refresh(appointment)
|
|
|
|
return _to_appointment_response(appointment) # Uses eager-loaded relationship
|