arbret/backend/routes/auth.py

135 lines
4.2 KiB
Python
Raw Normal View History

2025-12-20 22:18:14 +01:00
"""Authentication routes for register, login, logout, and current user."""
from datetime import UTC, datetime
2025-12-20 22:18:14 +01:00
from fastapi import APIRouter, Depends, HTTPException, Response, status
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from auth import (
ACCESS_TOKEN_EXPIRE_MINUTES,
COOKIE_NAME,
2025-12-20 22:38:39 +01:00
COOKIE_SECURE,
2025-12-20 22:18:14 +01:00
authenticate_user,
build_user_response,
2025-12-20 22:18:14 +01:00
create_access_token,
get_current_user,
get_password_hash,
get_user_by_email,
2025-12-20 22:18:14 +01:00
)
from database import get_db
from invite_utils import normalize_identifier
from models import ROLE_REGULAR, Invite, InviteStatus, Role, User
from schemas import RegisterWithInvite, UserLogin, UserResponse
2025-12-20 22:18:14 +01:00
router = APIRouter(prefix="/api/auth", tags=["auth"])
def set_auth_cookie(response: Response, token: str) -> None:
"""Set the authentication cookie on the response."""
response.set_cookie(
key=COOKIE_NAME,
value=token,
httponly=True,
2025-12-20 22:38:39 +01:00
secure=COOKIE_SECURE,
2025-12-20 22:18:14 +01:00
samesite="lax",
max_age=ACCESS_TOKEN_EXPIRE_MINUTES * 60,
)
async def get_default_role(db: AsyncSession) -> Role | None:
"""Get the default 'regular' role for new users."""
result = await db.execute(select(Role).where(Role.name == ROLE_REGULAR))
return result.scalar_one_or_none()
@router.post("/register", response_model=UserResponse)
async def register(
user_data: RegisterWithInvite,
response: Response,
db: AsyncSession = Depends(get_db),
) -> UserResponse:
"""Register a new user using an invite code."""
# Validate invite
normalized_identifier = normalize_identifier(user_data.invite_identifier)
query = select(Invite).where(Invite.identifier == normalized_identifier)
result = await db.execute(query)
2025-12-20 22:18:14 +01:00
invite = result.scalar_one_or_none()
# Return same error for not found, spent, and revoked to avoid information leakage
if not invite or invite.status in (InviteStatus.SPENT, InviteStatus.REVOKED):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid invite code",
)
# Check email not already taken
existing_user = await get_user_by_email(db, user_data.email)
if existing_user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Email already registered",
)
# Create user with godfather
user = User(
email=user_data.email,
hashed_password=get_password_hash(user_data.password),
godfather_id=invite.godfather_id,
)
# Assign default role
default_role = await get_default_role(db)
if default_role:
user.roles.append(default_role)
db.add(user)
await db.flush() # Get user ID
# Mark invite as spent
invite.status = InviteStatus.SPENT
invite.used_by_id = user.id
invite.spent_at = datetime.now(UTC)
2025-12-20 22:18:14 +01:00
await db.commit()
await db.refresh(user)
access_token = create_access_token(data={"sub": str(user.id)})
set_auth_cookie(response, access_token)
return await build_user_response(user, db)
@router.post("/login", response_model=UserResponse)
async def login(
user_data: UserLogin,
response: Response,
db: AsyncSession = Depends(get_db),
) -> UserResponse:
"""Authenticate a user and return their info with an auth cookie."""
user = await authenticate_user(db, user_data.email, user_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect email or password",
)
access_token = create_access_token(data={"sub": str(user.id)})
set_auth_cookie(response, access_token)
return await build_user_response(user, db)
@router.post("/logout")
async def logout(response: Response) -> dict[str, bool]:
"""Log out the current user by clearing their auth cookie."""
response.delete_cookie(key=COOKIE_NAME)
return {"ok": True}
@router.get("/me", response_model=UserResponse)
async def get_me(
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
) -> UserResponse:
"""Get the current authenticated user's info."""
return await build_user_response(current_user, db)