Created shared_constants.py module that loads constants from the shared JSON file. Updated availability.py and booking.py to import from this module instead of hardcoding values. This ensures backend and frontend stay in sync with the same source of truth for booking configuration.
378 lines
13 KiB
Python
378 lines
13 KiB
Python
"""Booking routes for users to book appointments."""
|
|
from datetime import date, datetime, time, timedelta, timezone
|
|
|
|
from fastapi import APIRouter, Depends, HTTPException, Query
|
|
from sqlalchemy import select, and_
|
|
from sqlalchemy.exc import IntegrityError
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from auth import require_permission
|
|
from database import get_db
|
|
from models import User, Availability, Appointment, AppointmentStatus, Permission
|
|
from schemas import (
|
|
BookableSlot,
|
|
AvailableSlotsResponse,
|
|
BookingRequest,
|
|
AppointmentResponse,
|
|
)
|
|
from shared_constants import SLOT_DURATION_MINUTES, MIN_ADVANCE_DAYS, MAX_ADVANCE_DAYS
|
|
|
|
|
|
router = APIRouter(prefix="/api/booking", tags=["booking"])
|
|
|
|
|
|
def _get_bookable_date_range() -> tuple[date, date]:
|
|
"""Get the valid date range for booking (tomorrow to +30 days)."""
|
|
today = date.today()
|
|
min_date = today + timedelta(days=MIN_ADVANCE_DAYS)
|
|
max_date = today + timedelta(days=MAX_ADVANCE_DAYS)
|
|
return min_date, max_date
|
|
|
|
|
|
def _validate_booking_date(d: date) -> None:
|
|
"""Validate a date is within the bookable range."""
|
|
min_date, max_date = _get_bookable_date_range()
|
|
if d < min_date:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"Cannot book for today or past dates. Earliest bookable date: {min_date}",
|
|
)
|
|
if d > max_date:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"Cannot book more than {MAX_ADVANCE_DAYS} days ahead. Latest bookable: {max_date}",
|
|
)
|
|
|
|
|
|
def _expand_availability_to_slots(
|
|
availability_slots: list[Availability],
|
|
target_date: date,
|
|
) -> list[BookableSlot]:
|
|
"""Expand availability time ranges into 15-minute bookable slots."""
|
|
result: list[BookableSlot] = []
|
|
|
|
for avail in availability_slots:
|
|
# Create datetime objects for start and end
|
|
current = datetime.combine(target_date, avail.start_time, tzinfo=timezone.utc)
|
|
end = datetime.combine(target_date, avail.end_time, tzinfo=timezone.utc)
|
|
|
|
# Generate 15-minute slots
|
|
while current + timedelta(minutes=SLOT_DURATION_MINUTES) <= end:
|
|
slot_end = current + timedelta(minutes=SLOT_DURATION_MINUTES)
|
|
result.append(BookableSlot(start_time=current, end_time=slot_end))
|
|
current = slot_end
|
|
|
|
return result
|
|
|
|
|
|
@router.get("/slots", response_model=AvailableSlotsResponse)
|
|
async def get_available_slots(
|
|
target_date: date = Query(..., alias="date", description="Date to get slots for"),
|
|
db: AsyncSession = Depends(get_db),
|
|
_current_user: User = Depends(require_permission(Permission.BOOK_APPOINTMENT)),
|
|
) -> AvailableSlotsResponse:
|
|
"""Get available booking slots for a specific date."""
|
|
_validate_booking_date(target_date)
|
|
|
|
# Get availability for this date
|
|
result = await db.execute(
|
|
select(Availability)
|
|
.where(Availability.date == target_date)
|
|
.order_by(Availability.start_time)
|
|
)
|
|
availability_slots = result.scalars().all()
|
|
|
|
if not availability_slots:
|
|
return AvailableSlotsResponse(date=target_date, slots=[])
|
|
|
|
# Expand to 15-minute slots
|
|
all_slots = _expand_availability_to_slots(availability_slots, target_date)
|
|
|
|
# Get existing booked appointments for this date
|
|
day_start = datetime.combine(target_date, time.min, tzinfo=timezone.utc)
|
|
day_end = datetime.combine(target_date, time.max, tzinfo=timezone.utc)
|
|
|
|
result = await db.execute(
|
|
select(Appointment.slot_start)
|
|
.where(
|
|
and_(
|
|
Appointment.slot_start >= day_start,
|
|
Appointment.slot_start <= day_end,
|
|
Appointment.status == AppointmentStatus.BOOKED,
|
|
)
|
|
)
|
|
)
|
|
booked_starts = {row[0] for row in result.fetchall()}
|
|
|
|
# Filter out already booked slots
|
|
available_slots = [
|
|
slot for slot in all_slots
|
|
if slot.start_time not in booked_starts
|
|
]
|
|
|
|
return AvailableSlotsResponse(date=target_date, slots=available_slots)
|
|
|
|
|
|
@router.post("", response_model=AppointmentResponse)
|
|
async def create_booking(
|
|
request: BookingRequest,
|
|
db: AsyncSession = Depends(get_db),
|
|
current_user: User = Depends(require_permission(Permission.BOOK_APPOINTMENT)),
|
|
) -> AppointmentResponse:
|
|
"""Book an appointment slot."""
|
|
slot_date = request.slot_start.date()
|
|
_validate_booking_date(slot_date)
|
|
|
|
# Validate slot is on 15-minute boundary
|
|
if request.slot_start.minute not in (0, 15, 30, 45):
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail="Slot start time must be on 15-minute boundary",
|
|
)
|
|
if request.slot_start.second != 0 or request.slot_start.microsecond != 0:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail="Slot start time must not have seconds or microseconds",
|
|
)
|
|
|
|
# Verify slot falls within availability
|
|
slot_start_time = request.slot_start.time()
|
|
slot_end_time = (request.slot_start + timedelta(minutes=SLOT_DURATION_MINUTES)).time()
|
|
|
|
result = await db.execute(
|
|
select(Availability)
|
|
.where(
|
|
and_(
|
|
Availability.date == slot_date,
|
|
Availability.start_time <= slot_start_time,
|
|
Availability.end_time >= slot_end_time,
|
|
)
|
|
)
|
|
)
|
|
matching_availability = result.scalar_one_or_none()
|
|
|
|
if not matching_availability:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail="Selected slot is not within available time ranges",
|
|
)
|
|
|
|
# Create the appointment
|
|
slot_end = request.slot_start + timedelta(minutes=SLOT_DURATION_MINUTES)
|
|
appointment = Appointment(
|
|
user_id=current_user.id,
|
|
slot_start=request.slot_start,
|
|
slot_end=slot_end,
|
|
note=request.note,
|
|
status=AppointmentStatus.BOOKED,
|
|
)
|
|
|
|
db.add(appointment)
|
|
|
|
try:
|
|
await db.commit()
|
|
await db.refresh(appointment)
|
|
except IntegrityError:
|
|
await db.rollback()
|
|
raise HTTPException(
|
|
status_code=409,
|
|
detail="This slot has already been booked. Please select another slot.",
|
|
)
|
|
|
|
return AppointmentResponse(
|
|
id=appointment.id,
|
|
user_id=appointment.user_id,
|
|
user_email=current_user.email,
|
|
slot_start=appointment.slot_start,
|
|
slot_end=appointment.slot_end,
|
|
note=appointment.note,
|
|
status=appointment.status.value,
|
|
created_at=appointment.created_at,
|
|
cancelled_at=appointment.cancelled_at,
|
|
)
|
|
|
|
|
|
# =============================================================================
|
|
# User's Appointments Endpoints
|
|
# =============================================================================
|
|
|
|
appointments_router = APIRouter(prefix="/api/appointments", tags=["appointments"])
|
|
|
|
|
|
async def _get_user_email(db: AsyncSession, user_id: int) -> str:
|
|
"""Get user email by ID."""
|
|
result = await db.execute(select(User.email).where(User.id == user_id))
|
|
email = result.scalar_one_or_none()
|
|
return email or "unknown"
|
|
|
|
|
|
@appointments_router.get("", response_model=list[AppointmentResponse])
|
|
async def get_my_appointments(
|
|
db: AsyncSession = Depends(get_db),
|
|
current_user: User = Depends(require_permission(Permission.VIEW_OWN_APPOINTMENTS)),
|
|
) -> list[AppointmentResponse]:
|
|
"""Get the current user's appointments, sorted by date (upcoming first)."""
|
|
result = await db.execute(
|
|
select(Appointment)
|
|
.where(Appointment.user_id == current_user.id)
|
|
.order_by(Appointment.slot_start.desc())
|
|
)
|
|
appointments = result.scalars().all()
|
|
|
|
return [
|
|
AppointmentResponse(
|
|
id=apt.id,
|
|
user_id=apt.user_id,
|
|
user_email=current_user.email,
|
|
slot_start=apt.slot_start,
|
|
slot_end=apt.slot_end,
|
|
note=apt.note,
|
|
status=apt.status.value,
|
|
created_at=apt.created_at,
|
|
cancelled_at=apt.cancelled_at,
|
|
)
|
|
for apt in appointments
|
|
]
|
|
|
|
|
|
@appointments_router.post("/{appointment_id}/cancel", response_model=AppointmentResponse)
|
|
async def cancel_my_appointment(
|
|
appointment_id: int,
|
|
db: AsyncSession = Depends(get_db),
|
|
current_user: User = Depends(require_permission(Permission.CANCEL_OWN_APPOINTMENT)),
|
|
) -> AppointmentResponse:
|
|
"""Cancel one of the current user's appointments."""
|
|
# Get the appointment
|
|
result = await db.execute(
|
|
select(Appointment).where(Appointment.id == appointment_id)
|
|
)
|
|
appointment = result.scalar_one_or_none()
|
|
|
|
if not appointment:
|
|
raise HTTPException(status_code=404, detail="Appointment not found")
|
|
|
|
# Verify ownership
|
|
if appointment.user_id != current_user.id:
|
|
raise HTTPException(status_code=403, detail="Cannot cancel another user's appointment")
|
|
|
|
# Check if already cancelled
|
|
if appointment.status != AppointmentStatus.BOOKED:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"Cannot cancel appointment with status '{appointment.status.value}'"
|
|
)
|
|
|
|
# Check if appointment is in the past
|
|
if appointment.slot_start <= datetime.now(timezone.utc):
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail="Cannot cancel a past appointment"
|
|
)
|
|
|
|
# Cancel the appointment
|
|
appointment.status = AppointmentStatus.CANCELLED_BY_USER
|
|
appointment.cancelled_at = datetime.now(timezone.utc)
|
|
|
|
await db.commit()
|
|
await db.refresh(appointment)
|
|
|
|
return AppointmentResponse(
|
|
id=appointment.id,
|
|
user_id=appointment.user_id,
|
|
user_email=current_user.email,
|
|
slot_start=appointment.slot_start,
|
|
slot_end=appointment.slot_end,
|
|
note=appointment.note,
|
|
status=appointment.status.value,
|
|
created_at=appointment.created_at,
|
|
cancelled_at=appointment.cancelled_at,
|
|
)
|
|
|
|
|
|
# =============================================================================
|
|
# Admin Appointments Endpoints
|
|
# =============================================================================
|
|
|
|
admin_appointments_router = APIRouter(prefix="/api/admin/appointments", tags=["admin-appointments"])
|
|
|
|
|
|
@admin_appointments_router.get("", response_model=list[AppointmentResponse])
|
|
async def get_all_appointments(
|
|
db: AsyncSession = Depends(get_db),
|
|
_current_user: User = Depends(require_permission(Permission.VIEW_ALL_APPOINTMENTS)),
|
|
) -> list[AppointmentResponse]:
|
|
"""Get all appointments (admin only), sorted by date descending."""
|
|
result = await db.execute(
|
|
select(Appointment)
|
|
.order_by(Appointment.slot_start.desc())
|
|
)
|
|
appointments = result.scalars().all()
|
|
|
|
responses = []
|
|
for apt in appointments:
|
|
user_email = await _get_user_email(db, apt.user_id)
|
|
responses.append(AppointmentResponse(
|
|
id=apt.id,
|
|
user_id=apt.user_id,
|
|
user_email=user_email,
|
|
slot_start=apt.slot_start,
|
|
slot_end=apt.slot_end,
|
|
note=apt.note,
|
|
status=apt.status.value,
|
|
created_at=apt.created_at,
|
|
cancelled_at=apt.cancelled_at,
|
|
))
|
|
|
|
return responses
|
|
|
|
|
|
@admin_appointments_router.post("/{appointment_id}/cancel", response_model=AppointmentResponse)
|
|
async def admin_cancel_appointment(
|
|
appointment_id: int,
|
|
db: AsyncSession = Depends(get_db),
|
|
_current_user: User = Depends(require_permission(Permission.CANCEL_ANY_APPOINTMENT)),
|
|
) -> AppointmentResponse:
|
|
"""Cancel any appointment (admin only)."""
|
|
# Get the appointment
|
|
result = await db.execute(
|
|
select(Appointment).where(Appointment.id == appointment_id)
|
|
)
|
|
appointment = result.scalar_one_or_none()
|
|
|
|
if not appointment:
|
|
raise HTTPException(status_code=404, detail="Appointment not found")
|
|
|
|
# Check if already cancelled
|
|
if appointment.status != AppointmentStatus.BOOKED:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"Cannot cancel appointment with status '{appointment.status.value}'"
|
|
)
|
|
|
|
# Check if appointment is in the past
|
|
if appointment.slot_start <= datetime.now(timezone.utc):
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail="Cannot cancel a past appointment"
|
|
)
|
|
|
|
# Cancel the appointment
|
|
appointment.status = AppointmentStatus.CANCELLED_BY_ADMIN
|
|
appointment.cancelled_at = datetime.now(timezone.utc)
|
|
|
|
await db.commit()
|
|
await db.refresh(appointment)
|
|
|
|
user_email = await _get_user_email(db, appointment.user_id)
|
|
|
|
return AppointmentResponse(
|
|
id=appointment.id,
|
|
user_id=appointment.user_id,
|
|
user_email=user_email,
|
|
slot_start=appointment.slot_start,
|
|
slot_end=appointment.slot_end,
|
|
note=appointment.note,
|
|
status=appointment.status.value,
|
|
created_at=appointment.created_at,
|
|
cancelled_at=appointment.cancelled_at,
|
|
)
|