"""Authentication service for user registration and login.""" from datetime import UTC, datetime from sqlalchemy.ext.asyncio import AsyncSession from auth import ( create_access_token, get_password_hash, ) from exceptions import BadRequestError, UnauthorizedError from invite_utils import normalize_identifier from models import ROLE_REGULAR, InviteStatus, User from repositories.invite import InviteRepository from repositories.role import RoleRepository from repositories.user import UserRepository class AuthService: """Service for authentication-related business logic.""" def __init__(self, db: AsyncSession): self.db = db self.user_repo = UserRepository(db) self.invite_repo = InviteRepository(db) self.role_repo = RoleRepository(db) async def register_user( self, email: str, password: str, invite_identifier: str ) -> tuple[User, str]: """ Register a new user using an invite code. Args: email: User email address password: Plain text password (will be hashed) invite_identifier: Invite code identifier Returns: Tuple of (User, access_token) Raises: BadRequestError: If invite is invalid, email already taken, or other validation fails """ # Validate invite normalized_identifier = normalize_identifier(invite_identifier) invite = await self.invite_repo.get_by_identifier(normalized_identifier) # Return same error for not found, spent, and revoked # to avoid information leakage if not invite or invite.status in ( InviteStatus.SPENT, InviteStatus.REVOKED, ): raise BadRequestError("Invalid invite code") # Check email not already taken existing_user = await self.user_repo.get_by_email(email) if existing_user: raise BadRequestError("Email already registered") # Create user with godfather user = User( email=email, hashed_password=get_password_hash(password), godfather_id=invite.godfather_id, ) # Assign default role default_role = await self.role_repo.get_by_name(ROLE_REGULAR) if default_role: user.roles.append(default_role) # Create user (flush to get ID) user = await self.user_repo.create(user) # Mark invite as spent invite.status = InviteStatus.SPENT invite.used_by_id = user.id invite.spent_at = datetime.now(UTC) await self.invite_repo.update(invite) # Refresh user to ensure it's up to date await self.user_repo.refresh(user) # Create access token access_token = create_access_token(data={"sub": str(user.id)}) return user, access_token async def login_user(self, email: str, password: str) -> tuple[User, str]: """ Authenticate a user and create access token. Args: email: User email address password: Plain text password Returns: Tuple of (User, access_token) Raises: BadRequestError: If authentication fails """ from auth import authenticate_user user = await authenticate_user(self.db, email, password) if not user: raise UnauthorizedError("Incorrect email or password") # Create access token access_token = create_access_token(data={"sub": str(user.id)}) return user, access_token