This commit is contained in:
counterweight 2025-12-26 19:21:34 +01:00
parent c0999370c6
commit 4e1a339432
Signed by: counterweight
GPG key ID: 883EDBAA726BD96C
17 changed files with 393 additions and 91 deletions

View file

@ -17,6 +17,7 @@ from routes import exchange as exchange_routes
from routes import invites as invites_routes
from routes import meta as meta_routes
from routes import profile as profile_routes
from routes import test as test_routes
from shared_constants import PRICE_REFRESH_SECONDS
from validate_constants import validate_shared_constants
@ -91,6 +92,7 @@ app.include_router(audit_routes.router)
app.include_router(profile_routes.router)
app.include_router(availability_routes.router)
app.include_router(meta_routes.router)
app.include_router(test_routes.router)
# Include routers - modules with multiple routers
for r in invites_routes.routers:

49
backend/routes/test.py Normal file
View file

@ -0,0 +1,49 @@
"""Test-only endpoints for e2e test isolation."""
import os
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession
from database import get_db
router = APIRouter(prefix="/api/test", tags=["test"])
@router.post("/reset")
async def reset_database(db: AsyncSession = Depends(get_db)):
"""
Truncate all tables and re-seed base data.
Only available when E2E_MODE environment variable is set.
"""
# Safety check - only allow in e2e mode
if not os.getenv("E2E_MODE"):
raise HTTPException(
status_code=403, detail="This endpoint is only available in E2E_MODE"
)
# Get all table names from the database
result = await db.execute(
text("""
SELECT tablename
FROM pg_tables
WHERE schemaname = 'public'
AND tablename != 'alembic_version'
ORDER BY tablename
""")
)
tables = [row[0] for row in result]
# Truncate all tables (CASCADE handles foreign keys)
if tables:
table_list = ", ".join(f'"{table}"' for table in tables)
await db.execute(text(f"TRUNCATE TABLE {table_list} CASCADE"))
await db.commit()
# Re-seed essential data
from seed_e2e import seed_base_data
await seed_base_data(db)
return {"status": "reset", "tables_truncated": len(tables)}

90
backend/seed_e2e.py Normal file
View file

@ -0,0 +1,90 @@
"""Fast re-seeding function for e2e tests - only seeds essential data."""
import os
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from auth import get_password_hash
from models import (
ROLE_ADMIN,
ROLE_DEFINITIONS,
ROLE_REGULAR,
Role,
User,
)
async def seed_base_data(db: AsyncSession) -> None:
"""
Seed only the minimal data needed for e2e tests:
- Roles (admin, regular) with permissions
- Test users (from env vars)
"""
# Get environment variables with defaults
dev_user_email = os.getenv("DEV_USER_EMAIL", "user@example.com")
dev_user_password = os.getenv("DEV_USER_PASSWORD", "user123")
dev_admin_email = os.getenv("DEV_ADMIN_EMAIL", "admin@example.com")
dev_admin_password = os.getenv("DEV_ADMIN_PASSWORD", "admin123")
# Create roles with permissions
for role_name, role_config in ROLE_DEFINITIONS.items():
result = await db.execute(select(Role).where(Role.name == role_name))
role = result.scalar_one_or_none()
if not role:
role = Role(name=role_name, description=role_config["description"])
db.add(role)
await db.flush() # Get the role ID
# Set permissions for the role
await role.set_permissions(db, role_config["permissions"])
await db.flush() # Ensure roles are committed before creating users
# Get roles for users
admin_role_result = await db.execute(select(Role).where(Role.name == ROLE_ADMIN))
admin_role = admin_role_result.scalar_one()
regular_role_result = await db.execute(
select(Role).where(Role.name == ROLE_REGULAR)
)
regular_role = regular_role_result.scalar_one()
# Create regular dev user
regular_user_result = await db.execute(
select(User).where(User.email == dev_user_email)
)
regular_user = regular_user_result.scalar_one_or_none()
if not regular_user:
regular_user = User(
email=dev_user_email,
hashed_password=get_password_hash(dev_user_password),
roles=[regular_role],
)
db.add(regular_user)
else:
# Update existing user
regular_user.hashed_password = get_password_hash(dev_user_password)
regular_user.roles = [regular_role]
# Create admin dev user
admin_user_result = await db.execute(
select(User).where(User.email == dev_admin_email)
)
admin_user = admin_user_result.scalar_one_or_none()
if not admin_user:
admin_user = User(
email=dev_admin_email,
hashed_password=get_password_hash(dev_admin_password),
roles=[admin_role],
)
db.add(admin_user)
else:
# Update existing user
admin_user.hashed_password = get_password_hash(dev_admin_password)
admin_user.roles = [admin_role]
await db.commit()