diff --git a/Makefile b/Makefile index e5fd5fe..8f588ed 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,4 @@ -.PHONY: install-backend install-frontend install backend frontend db db-stop db-ready db-seed dev test test-backend test-frontend test-e2e +.PHONY: install-backend install-frontend install backend frontend db db-stop db-ready db-seed dev test test-backend test-frontend test-e2e typecheck -include .env export @@ -52,3 +52,6 @@ test-e2e: ./scripts/e2e.sh test: test-backend test-frontend test-e2e + +typecheck: + cd backend && uv run mypy . diff --git a/backend/auth.py b/backend/auth.py index 87bfe07..d1f05fe 100644 --- a/backend/auth.py +++ b/backend/auth.py @@ -1,6 +1,5 @@ import os from datetime import datetime, timedelta, timezone -from typing import List, Optional import bcrypt from fastapi import Depends, HTTPException, Request, status @@ -30,8 +29,8 @@ UserLogin = UserCredentials class UserResponse(BaseModel): id: int email: str - roles: List[str] - permissions: List[str] + roles: list[str] + permissions: list[str] class TokenResponse(BaseModel): @@ -54,19 +53,20 @@ def get_password_hash(password: str) -> str: ).decode("utf-8") -def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str: - to_encode = data.copy() +def create_access_token(data: dict[str, str], expires_delta: timedelta | None = None) -> str: + to_encode: dict[str, str | datetime] = dict(data) expire = datetime.now(timezone.utc) + (expires_delta or timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)) - to_encode.update({"exp": expire}) - return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) + to_encode["exp"] = expire + encoded: str = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) + return encoded -async def get_user_by_email(db: AsyncSession, email: str) -> Optional[User]: +async def get_user_by_email(db: AsyncSession, email: str) -> User | None: result = await db.execute(select(User).where(User.email == email)) return result.scalar_one_or_none() -async def authenticate_user(db: AsyncSession, email: str, password: str) -> Optional[User]: +async def authenticate_user(db: AsyncSession, email: str, password: str) -> User | None: user = await get_user_by_email(db, email) if not user or not verify_password(password, user.hashed_password): return None diff --git a/backend/main.py b/backend/main.py index 1a4975e..f21e7b9 100644 --- a/backend/main.py +++ b/backend/main.py @@ -1,6 +1,6 @@ from contextlib import asynccontextmanager from datetime import datetime -from typing import List +from typing import Any, Callable, Generic, TypeVar from fastapi import FastAPI, Depends, HTTPException, Response, status, Query from fastapi.middleware.cors import CORSMiddleware @@ -9,7 +9,43 @@ from sqlalchemy import select, func, desc from sqlalchemy.ext.asyncio import AsyncSession from database import engine, get_db, Base -from models import Counter, User, SumRecord, CounterRecord, Permission, Role +from models import Counter, User, SumRecord, CounterRecord, Permission, Role, ROLE_REGULAR + + +R = TypeVar("R", bound=BaseModel) + + +async def paginate_with_user_email( + db: AsyncSession, + model: type[SumRecord] | type[CounterRecord], + page: int, + per_page: int, + row_mapper: Callable[..., R], +) -> tuple[list[R], int, int]: + """ + Generic pagination helper for audit records that need user email. + + Returns: (records, total, total_pages) + """ + # Get total count + count_result = await db.execute(select(func.count(model.id))) + total = count_result.scalar() or 0 + total_pages = (total + per_page - 1) // per_page if total > 0 else 1 + + # Get paginated records with user email + offset = (page - 1) * per_page + query = ( + select(model, User.email) + .join(User, model.user_id == User.id) + .order_by(desc(model.created_at)) + .offset(offset) + .limit(per_page) + ) + result = await db.execute(query) + rows = result.all() + + records: list[R] = [row_mapper(record, email) for record, email in rows] + return records, total, total_pages from auth import ( ACCESS_TOKEN_EXPIRE_MINUTES, COOKIE_NAME, @@ -57,7 +93,7 @@ def set_auth_cookie(response: Response, token: str) -> None: async def get_default_role(db: AsyncSession) -> Role | None: """Get the default 'regular' role for new users.""" - result = await db.execute(select(Role).where(Role.name == "regular")) + result = await db.execute(select(Role).where(Role.name == ROLE_REGULAR)) return result.scalar_one_or_none() @@ -214,20 +250,30 @@ class SumRecordResponse(BaseModel): created_at: datetime -class PaginatedCounterRecords(BaseModel): - records: List[CounterRecordResponse] +RecordT = TypeVar("RecordT", bound=BaseModel) + + +class PaginatedResponse(BaseModel, Generic[RecordT]): + """Generic paginated response wrapper.""" + records: list[RecordT] total: int page: int per_page: int total_pages: int -class PaginatedSumRecords(BaseModel): - records: List[SumRecordResponse] - total: int - page: int - per_page: int - total_pages: int +PaginatedCounterRecords = PaginatedResponse[CounterRecordResponse] +PaginatedSumRecords = PaginatedResponse[SumRecordResponse] + + +def _map_counter_record(record: CounterRecord, email: str) -> CounterRecordResponse: + return CounterRecordResponse( + id=record.id, + user_email=email, + value_before=record.value_before, + value_after=record.value_after, + created_at=record.created_at, + ) @app.get("/api/audit/counter", response_model=PaginatedCounterRecords) @@ -237,34 +283,9 @@ async def get_counter_records( db: AsyncSession = Depends(get_db), _current_user: User = Depends(require_permission(Permission.VIEW_AUDIT)), ): - # Get total count - count_result = await db.execute(select(func.count(CounterRecord.id))) - total = count_result.scalar() or 0 - total_pages = (total + per_page - 1) // per_page if total > 0 else 1 - - # Get paginated records with user email - offset = (page - 1) * per_page - query = ( - select(CounterRecord, User.email) - .join(User, CounterRecord.user_id == User.id) - .order_by(desc(CounterRecord.created_at)) - .offset(offset) - .limit(per_page) + records, total, total_pages = await paginate_with_user_email( + db, CounterRecord, page, per_page, _map_counter_record ) - result = await db.execute(query) - rows = result.all() - - records = [ - CounterRecordResponse( - id=record.id, - user_email=email, - value_before=record.value_before, - value_after=record.value_after, - created_at=record.created_at, - ) - for record, email in rows - ] - return PaginatedCounterRecords( records=records, total=total, @@ -274,6 +295,17 @@ async def get_counter_records( ) +def _map_sum_record(record: SumRecord, email: str) -> SumRecordResponse: + return SumRecordResponse( + id=record.id, + user_email=email, + a=record.a, + b=record.b, + result=record.result, + created_at=record.created_at, + ) + + @app.get("/api/audit/sum", response_model=PaginatedSumRecords) async def get_sum_records( page: int = Query(1, ge=1), @@ -281,35 +313,9 @@ async def get_sum_records( db: AsyncSession = Depends(get_db), _current_user: User = Depends(require_permission(Permission.VIEW_AUDIT)), ): - # Get total count - count_result = await db.execute(select(func.count(SumRecord.id))) - total = count_result.scalar() or 0 - total_pages = (total + per_page - 1) // per_page if total > 0 else 1 - - # Get paginated records with user email - offset = (page - 1) * per_page - query = ( - select(SumRecord, User.email) - .join(User, SumRecord.user_id == User.id) - .order_by(desc(SumRecord.created_at)) - .offset(offset) - .limit(per_page) + records, total, total_pages = await paginate_with_user_email( + db, SumRecord, page, per_page, _map_sum_record ) - result = await db.execute(query) - rows = result.all() - - records = [ - SumRecordResponse( - id=record.id, - user_email=email, - a=record.a, - b=record.b, - result=record.result, - created_at=record.created_at, - ) - for record, email in rows - ] - return PaginatedSumRecords( records=records, total=total, diff --git a/backend/models.py b/backend/models.py index 3726c3f..378157a 100644 --- a/backend/models.py +++ b/backend/models.py @@ -1,12 +1,17 @@ from datetime import datetime, UTC from enum import Enum as PyEnum -from typing import List, Set +from typing import TypedDict from sqlalchemy import Integer, String, Float, DateTime, ForeignKey, Table, Column, Enum, select from sqlalchemy.orm import Mapped, mapped_column, relationship from sqlalchemy.ext.asyncio import AsyncSession from database import Base +class RoleConfig(TypedDict): + description: str + permissions: list["Permission"] + + class Permission(str, PyEnum): """All available permissions in the system.""" # Counter permissions @@ -20,15 +25,19 @@ class Permission(str, PyEnum): VIEW_AUDIT = "view_audit" +# Role name constants +ROLE_ADMIN = "admin" +ROLE_REGULAR = "regular" + # Role definitions with their permissions -ROLE_DEFINITIONS = { - "admin": { +ROLE_DEFINITIONS: dict[str, RoleConfig] = { + ROLE_ADMIN: { "description": "Administrator with audit access", "permissions": [ Permission.VIEW_AUDIT, ], }, - "regular": { + ROLE_REGULAR: { "description": "Regular user with counter and sum access", "permissions": [ Permission.VIEW_COUNTER, @@ -65,24 +74,20 @@ class Role(Base): description: Mapped[str] = mapped_column(String(255), nullable=True) # Relationship to users - users: Mapped[List["User"]] = relationship( + users: Mapped[list["User"]] = relationship( "User", secondary=user_roles, back_populates="roles", ) - async def get_permissions(self, db: AsyncSession) -> Set[Permission]: + async def get_permissions(self, db: AsyncSession) -> set[Permission]: """Get all permissions for this role.""" result = await db.execute( select(role_permissions.c.permission).where(role_permissions.c.role_id == self.id) ) return {row[0] for row in result.fetchall()} - async def add_permission(self, db: AsyncSession, permission: Permission) -> None: - """Add a permission to this role.""" - await db.execute(role_permissions.insert().values(role_id=self.id, permission=permission)) - - async def set_permissions(self, db: AsyncSession, permissions: List[Permission]) -> None: + async def set_permissions(self, db: AsyncSession, permissions: list[Permission]) -> None: """Set all permissions for this role (replaces existing).""" await db.execute(role_permissions.delete().where(role_permissions.c.role_id == self.id)) for perm in permissions: @@ -97,20 +102,21 @@ class User(Base): hashed_password: Mapped[str] = mapped_column(String(255), nullable=False) # Relationship to roles - roles: Mapped[List[Role]] = relationship( + roles: Mapped[list[Role]] = relationship( "Role", secondary=user_roles, back_populates="users", lazy="selectin", ) - async def get_permissions(self, db: AsyncSession) -> Set[Permission]: - """Get all permissions from all roles.""" - permissions: Set[Permission] = set() - for role in self.roles: - role_perms = await role.get_permissions(db) - permissions.update(role_perms) - return permissions + async def get_permissions(self, db: AsyncSession) -> set[Permission]: + """Get all permissions from all roles in a single query.""" + result = await db.execute( + select(role_permissions.c.permission) + .join(user_roles, role_permissions.c.role_id == user_roles.c.role_id) + .where(user_roles.c.user_id == self.id) + ) + return {row[0] for row in result.fetchall()} async def has_permission(self, db: AsyncSession, permission: Permission) -> bool: """Check if user has a specific permission through any of their roles.""" @@ -118,7 +124,7 @@ class User(Base): return permission in permissions @property - def role_names(self) -> List[str]: + def role_names(self) -> list[str]: """Get list of role names for API responses.""" return [role.name for role in self.roles] diff --git a/backend/pyproject.toml b/backend/pyproject.toml index 55938bb..381017e 100644 --- a/backend/pyproject.toml +++ b/backend/pyproject.toml @@ -18,5 +18,14 @@ dev = [ "pytest-asyncio>=0.25.0", "httpx>=0.28.1", "aiosqlite>=0.20.0", + "mypy>=1.13.0", ] +[tool.mypy] +python_version = "3.11" +warn_return_any = true +warn_unused_ignores = true +check_untyped_defs = true +ignore_missing_imports = true +exclude = ["tests/"] + diff --git a/backend/seed.py b/backend/seed.py index 300cd99..185c2a6 100644 --- a/backend/seed.py +++ b/backend/seed.py @@ -1,11 +1,11 @@ """Seed the database with roles, permissions, and dev users.""" import asyncio import os -from typing import List from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession from database import engine, async_session, Base -from models import User, Role, Permission, role_permissions, ROLE_DEFINITIONS +from models import User, Role, Permission, role_permissions, ROLE_DEFINITIONS, ROLE_REGULAR, ROLE_ADMIN from auth import get_password_hash DEV_USER_EMAIL = os.environ["DEV_USER_EMAIL"] @@ -14,7 +14,7 @@ DEV_ADMIN_EMAIL = os.environ["DEV_ADMIN_EMAIL"] DEV_ADMIN_PASSWORD = os.environ["DEV_ADMIN_PASSWORD"] -async def upsert_role(db, name: str, description: str, permissions: List[Permission]) -> Role: +async def upsert_role(db: AsyncSession, name: str, description: str, permissions: list[Permission]) -> Role: """Create or update a role with the given permissions.""" result = await db.execute(select(Role).where(Role.name == name)) role = result.scalar_one_or_none() @@ -35,7 +35,7 @@ async def upsert_role(db, name: str, description: str, permissions: List[Permiss return role -async def upsert_user(db, email: str, password: str, role_names: List[str]) -> User: +async def upsert_user(db: AsyncSession, email: str, password: str, role_names: list[str]) -> User: """Create or update a user with the given credentials and roles.""" result = await db.execute(select(User).where(User.email == email)) user = result.scalar_one_or_none() @@ -45,12 +45,13 @@ async def upsert_user(db, email: str, password: str, role_names: List[str]) -> U for role_name in role_names: result = await db.execute(select(Role).where(Role.name == role_name)) role = result.scalar_one_or_none() - if role: - roles.append(role) + if not role: + raise ValueError(f"Role '{role_name}' not found") + roles.append(role) if user: user.hashed_password = get_password_hash(password) - user.roles = roles + user.roles = roles # type: ignore[assignment] print(f"Updated user: {email} with roles: {role_names}") else: user = User( @@ -64,7 +65,7 @@ async def upsert_user(db, email: str, password: str, role_names: List[str]) -> U return user -async def seed(): +async def seed() -> None: async with engine.begin() as conn: await conn.run_sync(Base.metadata.create_all) @@ -80,10 +81,10 @@ async def seed(): print("\n=== Seeding Users ===") # Create regular dev user - await upsert_user(db, DEV_USER_EMAIL, DEV_USER_PASSWORD, ["regular"]) + await upsert_user(db, DEV_USER_EMAIL, DEV_USER_PASSWORD, [ROLE_REGULAR]) # Create admin dev user - await upsert_user(db, DEV_ADMIN_EMAIL, DEV_ADMIN_PASSWORD, ["admin"]) + await upsert_user(db, DEV_ADMIN_EMAIL, DEV_ADMIN_PASSWORD, [ROLE_ADMIN]) await db.commit() print("\n=== Seeding Complete ===\n") diff --git a/backend/tests/conftest.py b/backend/tests/conftest.py index ba207c4..f363edf 100644 --- a/backend/tests/conftest.py +++ b/backend/tests/conftest.py @@ -1,6 +1,5 @@ import os from contextlib import asynccontextmanager -from typing import List # Set required env vars before importing app os.environ.setdefault("SECRET_KEY", "test-secret-key-for-testing-only") @@ -12,8 +11,9 @@ from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, Asyn from database import Base, get_db from main import app -from models import User, Role, Permission, ROLE_DEFINITIONS +from models import User, Role, Permission, ROLE_DEFINITIONS, ROLE_REGULAR, ROLE_ADMIN from auth import get_password_hash +from tests.helpers import unique_email TEST_DATABASE_URL = os.getenv( "TEST_DATABASE_URL", @@ -82,7 +82,7 @@ async def create_user_with_roles( db: AsyncSession, email: str, password: str, - role_names: List[str], + role_names: list[str], ) -> User: """Create a user with specified roles.""" # Get roles @@ -90,8 +90,9 @@ async def create_user_with_roles( for role_name in role_names: result = await db.execute(select(Role).where(Role.name == role_name)) role = result.scalar_one_or_none() - if role: - roles.append(role) + if not role: + raise ValueError(f"Role '{role_name}' not found. Did you run setup_roles()?") + roles.append(role) user = User( email=email, @@ -144,13 +145,11 @@ async def client(client_factory): @pytest.fixture(scope="function") async def regular_user(client_factory): """Create a regular user and return their credentials and cookies.""" - from tests.helpers import unique_email - email = unique_email("regular") password = "password123" async with client_factory.get_db_session() as db: - await create_user_with_roles(db, email, password, ["regular"]) + await create_user_with_roles(db, email, password, [ROLE_REGULAR]) # Login to get cookies response = await client_factory.post( @@ -169,13 +168,11 @@ async def regular_user(client_factory): @pytest.fixture(scope="function") async def admin_user(client_factory): """Create an admin user and return their credentials and cookies.""" - from tests.helpers import unique_email - email = unique_email("admin") password = "password123" async with client_factory.get_db_session() as db: - await create_user_with_roles(db, email, password, ["admin"]) + await create_user_with_roles(db, email, password, [ROLE_ADMIN]) # Login to get cookies response = await client_factory.post( @@ -194,8 +191,6 @@ async def admin_user(client_factory): @pytest.fixture(scope="function") async def user_no_roles(client_factory): """Create a user with NO roles and return their credentials and cookies.""" - from tests.helpers import unique_email - email = unique_email("noroles") password = "password123" diff --git a/frontend/app/audit/page.tsx b/frontend/app/audit/page.tsx index df37354..576eab5 100644 --- a/frontend/app/audit/page.tsx +++ b/frontend/app/audit/page.tsx @@ -393,4 +393,3 @@ const pageStyles: Record = { }; const styles = { ...sharedStyles, ...pageStyles }; - diff --git a/frontend/app/auth-context.tsx b/frontend/app/auth-context.tsx index 2884b26..705b143 100644 --- a/frontend/app/auth-context.tsx +++ b/frontend/app/auth-context.tsx @@ -28,7 +28,6 @@ interface AuthContextType { register: (email: string, password: string) => Promise; logout: () => Promise; hasPermission: (permission: PermissionType) => boolean; - hasAnyPermission: (...permissions: PermissionType[]) => boolean; hasRole: (role: string) => boolean; } @@ -104,10 +103,6 @@ export function AuthProvider({ children }: { children: ReactNode }) { return user?.permissions.includes(permission) ?? false; }, [user]); - const hasAnyPermission = useCallback((...permissions: PermissionType[]): boolean => { - return permissions.some((p) => user?.permissions.includes(p) ?? false); - }, [user]); - const hasRole = useCallback((role: string): boolean => { return user?.roles.includes(role) ?? false; }, [user]); @@ -121,7 +116,6 @@ export function AuthProvider({ children }: { children: ReactNode }) { register, logout, hasPermission, - hasAnyPermission, hasRole, }} > diff --git a/frontend/app/styles/shared.ts b/frontend/app/styles/shared.ts index ecaef96..86d98b2 100644 --- a/frontend/app/styles/shared.ts +++ b/frontend/app/styles/shared.ts @@ -79,4 +79,3 @@ export const sharedStyles: Record = { padding: "2rem", }, }; - diff --git a/frontend/app/sum/page.tsx b/frontend/app/sum/page.tsx index 075ec84..d1a39a6 100644 --- a/frontend/app/sum/page.tsx +++ b/frontend/app/sum/page.tsx @@ -288,4 +288,3 @@ const pageStyles: Record = { }; const styles = { ...sharedStyles, ...pageStyles }; -