arbret/backend/services/auth.py
counterweight 04333d210b
Delegate user persistence to UserRepository
- Add create() and update() methods to UserRepository
- Update ProfileService to use repository.update()
- Update AuthService to use repository.create()
2025-12-25 18:52:23 +01:00

114 lines
3.4 KiB
Python

"""Authentication service for user registration and login."""
from datetime import UTC, datetime
from sqlalchemy.ext.asyncio import AsyncSession
from auth import (
create_access_token,
get_password_hash,
)
from exceptions import BadRequestError, UnauthorizedError
from invite_utils import normalize_identifier
from models import ROLE_REGULAR, InviteStatus, User
from repositories.invite import InviteRepository
from repositories.role import RoleRepository
from repositories.user import UserRepository
class AuthService:
"""Service for authentication-related business logic."""
def __init__(self, db: AsyncSession):
self.db = db
self.user_repo = UserRepository(db)
self.invite_repo = InviteRepository(db)
self.role_repo = RoleRepository(db)
async def register_user(
self, email: str, password: str, invite_identifier: str
) -> tuple[User, str]:
"""
Register a new user using an invite code.
Args:
email: User email address
password: Plain text password (will be hashed)
invite_identifier: Invite code identifier
Returns:
Tuple of (User, access_token)
Raises:
BadRequestError: If invite is invalid, email already taken,
or other validation fails
"""
# Validate invite
normalized_identifier = normalize_identifier(invite_identifier)
invite = await self.invite_repo.get_by_identifier(normalized_identifier)
# Return same error for not found, spent, and revoked
# to avoid information leakage
if not invite or invite.status in (
InviteStatus.SPENT,
InviteStatus.REVOKED,
):
raise BadRequestError("Invalid invite code")
# Check email not already taken
existing_user = await self.user_repo.get_by_email(email)
if existing_user:
raise BadRequestError("Email already registered")
# Create user with godfather
user = User(
email=email,
hashed_password=get_password_hash(password),
godfather_id=invite.godfather_id,
)
# Assign default role
default_role = await self.role_repo.get_by_name(ROLE_REGULAR)
if default_role:
user.roles.append(default_role)
# Create user (flush to get ID)
user = await self.user_repo.create(user)
# Mark invite as spent
invite.status = InviteStatus.SPENT
invite.used_by_id = user.id
invite.spent_at = datetime.now(UTC)
await self.db.commit()
await self.db.refresh(user)
# Create access token
access_token = create_access_token(data={"sub": str(user.id)})
return user, access_token
async def login_user(self, email: str, password: str) -> tuple[User, str]:
"""
Authenticate a user and create access token.
Args:
email: User email address
password: Plain text password
Returns:
Tuple of (User, access_token)
Raises:
BadRequestError: If authentication fails
"""
from auth import authenticate_user
user = await authenticate_user(self.db, email, password)
if not user:
raise UnauthorizedError("Incorrect email or password")
# Create access token
access_token = create_access_token(data={"sub": str(user.id)})
return user, access_token