"""Authentication routes for register, login, logout, and current user.""" from datetime import datetime, UTC from fastapi import APIRouter, Depends, HTTPException, Response, status from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from auth import ( ACCESS_TOKEN_EXPIRE_MINUTES, COOKIE_NAME, COOKIE_SECURE, get_password_hash, get_user_by_email, authenticate_user, create_access_token, get_current_user, build_user_response, ) from database import get_db from invite_utils import normalize_identifier from models import User, Role, ROLE_REGULAR, Invite, InviteStatus from schemas import UserLogin, UserResponse, RegisterWithInvite router = APIRouter(prefix="/api/auth", tags=["auth"]) def set_auth_cookie(response: Response, token: str) -> None: """Set the authentication cookie on the response.""" response.set_cookie( key=COOKIE_NAME, value=token, httponly=True, secure=COOKIE_SECURE, samesite="lax", max_age=ACCESS_TOKEN_EXPIRE_MINUTES * 60, ) async def get_default_role(db: AsyncSession) -> Role | None: """Get the default 'regular' role for new users.""" result = await db.execute(select(Role).where(Role.name == ROLE_REGULAR)) return result.scalar_one_or_none() @router.post("/register", response_model=UserResponse) async def register( user_data: RegisterWithInvite, response: Response, db: AsyncSession = Depends(get_db), ) -> UserResponse: """Register a new user using an invite code.""" # Validate invite normalized_identifier = normalize_identifier(user_data.invite_identifier) result = await db.execute( select(Invite).where(Invite.identifier == normalized_identifier) ) invite = result.scalar_one_or_none() # Return same error for not found, spent, and revoked to avoid information leakage if not invite or invite.status in (InviteStatus.SPENT, InviteStatus.REVOKED): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid invite code", ) # Check email not already taken existing_user = await get_user_by_email(db, user_data.email) if existing_user: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Email already registered", ) # Create user with godfather user = User( email=user_data.email, hashed_password=get_password_hash(user_data.password), godfather_id=invite.godfather_id, ) # Assign default role default_role = await get_default_role(db) if default_role: user.roles.append(default_role) db.add(user) await db.flush() # Get user ID # Mark invite as spent invite.status = InviteStatus.SPENT invite.used_by_id = user.id invite.spent_at = datetime.now(UTC) await db.commit() await db.refresh(user) access_token = create_access_token(data={"sub": str(user.id)}) set_auth_cookie(response, access_token) return await build_user_response(user, db) @router.post("/login", response_model=UserResponse) async def login( user_data: UserLogin, response: Response, db: AsyncSession = Depends(get_db), ) -> UserResponse: """Authenticate a user and return their info with an auth cookie.""" user = await authenticate_user(db, user_data.email, user_data.password) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect email or password", ) access_token = create_access_token(data={"sub": str(user.id)}) set_auth_cookie(response, access_token) return await build_user_response(user, db) @router.post("/logout") async def logout(response: Response) -> dict[str, bool]: """Log out the current user by clearing their auth cookie.""" response.delete_cookie(key=COOKIE_NAME) return {"ok": True} @router.get("/me", response_model=UserResponse) async def get_me( current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ) -> UserResponse: """Get the current authenticated user's info.""" return await build_user_response(current_user, db)