- Add create(), update(), and reload_with_relationships() methods to InviteRepository - Update InviteService to use repository methods instead of direct db operations
193 lines
6.1 KiB
Python
193 lines
6.1 KiB
Python
"""Invite service for managing invites."""
|
|
|
|
from datetime import UTC, datetime
|
|
|
|
from sqlalchemy.exc import IntegrityError
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from exceptions import BadRequestError, ConflictError, NotFoundError
|
|
from invite_utils import (
|
|
generate_invite_identifier,
|
|
is_valid_identifier_format,
|
|
normalize_identifier,
|
|
)
|
|
from mappers import InviteMapper
|
|
from models import Invite, InviteStatus
|
|
from pagination import create_paginated_response
|
|
from repositories.invite import InviteRepository
|
|
from schemas import (
|
|
InviteCheckResponse,
|
|
PaginatedInviteRecords,
|
|
)
|
|
from utils.enum_validation import validate_enum
|
|
|
|
MAX_INVITE_COLLISION_RETRIES = 3
|
|
|
|
|
|
class InviteService:
|
|
"""Service for invite-related business logic."""
|
|
|
|
def __init__(self, db: AsyncSession):
|
|
self.db = db
|
|
self.invite_repo = InviteRepository(db)
|
|
|
|
async def check_invite_validity(self, identifier: str) -> InviteCheckResponse:
|
|
"""
|
|
Check if an invite is valid and can be used for signup.
|
|
|
|
Args:
|
|
identifier: Invite identifier to check
|
|
|
|
Returns:
|
|
InviteCheckResponse with validity status
|
|
"""
|
|
normalized = normalize_identifier(identifier)
|
|
|
|
# Validate format before querying database
|
|
if not is_valid_identifier_format(normalized):
|
|
return InviteCheckResponse(valid=False, error="Invalid invite code format")
|
|
|
|
invite = await self.invite_repo.get_by_identifier(normalized)
|
|
|
|
# Return same error for not found, spent, and revoked
|
|
# to avoid information leakage
|
|
if not invite or invite.status in (
|
|
InviteStatus.SPENT,
|
|
InviteStatus.REVOKED,
|
|
):
|
|
return InviteCheckResponse(valid=False, error="Invite not found")
|
|
|
|
return InviteCheckResponse(valid=True, status=invite.status.value)
|
|
|
|
async def get_user_invites(self, user_id: int) -> list[Invite]:
|
|
"""
|
|
Get all invites owned by a user.
|
|
|
|
Args:
|
|
user_id: ID of the godfather user
|
|
|
|
Returns:
|
|
List of Invite records, most recent first
|
|
"""
|
|
return await self.invite_repo.get_by_godfather_id(user_id, order_by_desc=True)
|
|
|
|
async def list_invites(
|
|
self,
|
|
page: int,
|
|
per_page: int,
|
|
status_filter: str | None = None,
|
|
godfather_id: int | None = None,
|
|
) -> PaginatedInviteRecords:
|
|
"""
|
|
List invites with pagination and filtering.
|
|
|
|
Args:
|
|
page: Page number (1-indexed)
|
|
per_page: Number of records per page
|
|
status_filter: Optional status filter (ready, spent, revoked)
|
|
godfather_id: Optional godfather user ID filter
|
|
|
|
Returns:
|
|
PaginatedInviteRecords with invites and pagination metadata
|
|
|
|
Raises:
|
|
BadRequestError: If status_filter is invalid
|
|
"""
|
|
# Validate status filter if provided
|
|
status_enum = None
|
|
if status_filter:
|
|
status_enum = validate_enum(InviteStatus, status_filter, "status")
|
|
|
|
# Get total count
|
|
total = await self.invite_repo.count(
|
|
status=status_enum, godfather_id=godfather_id
|
|
)
|
|
|
|
# Get paginated invites
|
|
invites = await self.invite_repo.list_paginated(
|
|
page=page,
|
|
per_page=per_page,
|
|
status=status_enum,
|
|
godfather_id=godfather_id,
|
|
)
|
|
|
|
# Build responses using preloaded relationships
|
|
records = [InviteMapper.to_response(invite) for invite in invites]
|
|
|
|
return create_paginated_response(records, total, page, per_page)
|
|
|
|
async def create_invite(self, godfather_id: int) -> Invite:
|
|
"""
|
|
Create a new invite for a specified godfather user.
|
|
|
|
Args:
|
|
godfather_id: ID of the godfather user
|
|
|
|
Returns:
|
|
Created Invite record
|
|
|
|
Raises:
|
|
BadRequestError: If godfather user not found
|
|
ConflictError: If unable to generate unique invite code after retries
|
|
"""
|
|
from repositories.user import UserRepository
|
|
|
|
# Validate godfather exists
|
|
user_repo = UserRepository(self.db)
|
|
godfather = await user_repo.get_by_id(godfather_id)
|
|
if not godfather:
|
|
raise BadRequestError("Godfather user not found")
|
|
|
|
# Try to create invite with retry on collision
|
|
invite: Invite | None = None
|
|
for attempt in range(MAX_INVITE_COLLISION_RETRIES):
|
|
identifier = generate_invite_identifier()
|
|
invite = Invite(
|
|
identifier=identifier,
|
|
godfather_id=godfather_id,
|
|
status=InviteStatus.READY,
|
|
)
|
|
try:
|
|
invite = await self.invite_repo.create(invite)
|
|
# Reload with relationships
|
|
invite = await self.invite_repo.reload_with_relationships(invite.id)
|
|
break
|
|
except IntegrityError:
|
|
await self.db.rollback()
|
|
if attempt == MAX_INVITE_COLLISION_RETRIES - 1:
|
|
raise ConflictError(
|
|
"Failed to generate unique invite code. Try again."
|
|
) from None
|
|
|
|
if invite is None:
|
|
raise BadRequestError("Failed to create invite")
|
|
return invite
|
|
|
|
async def revoke_invite(self, invite_id: int) -> Invite:
|
|
"""
|
|
Revoke an invite. Only READY invites can be revoked.
|
|
|
|
Args:
|
|
invite_id: ID of the invite to revoke
|
|
|
|
Returns:
|
|
Revoked Invite record
|
|
|
|
Raises:
|
|
NotFoundError: If invite not found
|
|
BadRequestError: If invite cannot be revoked (not READY)
|
|
"""
|
|
invite = await self.invite_repo.get_by_id(invite_id)
|
|
|
|
if not invite:
|
|
raise NotFoundError("Invite")
|
|
|
|
if invite.status != InviteStatus.READY:
|
|
raise BadRequestError(
|
|
f"Cannot revoke invite with status '{invite.status.value}'. "
|
|
"Only READY invites can be revoked."
|
|
)
|
|
|
|
invite.status = InviteStatus.REVOKED
|
|
invite.revoked_at = datetime.now(UTC)
|
|
return await self.invite_repo.update(invite)
|